You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Paris Carbone <pa...@kth.se> on 2016/11/10 19:19:40 UTC

[DISCUSS] FLIP-14: Loops API and Termination

Hi again Flink folks,

Here is our new proposal that addresses Job Termination - the loop fault tolerance proposal will follow shortly.
As Stephan hinted, we need operators to be aware of their scope level.

Thus, it is time we make loops great again! :)

Part of this FLIP basically introduces a new functional, compositional API for defining asynchronous loops for DataStreams.
This is coupled with a decentralized algorithm for job termination with loops - along the lines of what Stephan described.
We are already working on the actual prototypes as you can observe in the links of the doc.

Please let us know if you like (or don't like) it and why, in this mail discussion.

https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-PfTHtq3173EhsAkpBoQ

cheers
Paris and Fouad

On 31 Oct 2016, at 12:53, Paris Carbone <pa...@kth.se>> wrote:

Hey Stephan,

Thanks for looking into it!

+1 for breaking this up, will do that.

I can see your point and maybe it makes sense to introduce part of scoping to incorporate support for nested loops (otherwise it can’t work).
Let us think about this a bit. We will share another draft for a more detail description of the approach you are suggesting asap.


On 27 Oct 2016, at 10:55, Stephan Ewen <se...@apache.org>> wrote:

How about we break this up into two FLIPs? There are after all two
orthogonal problems (termination, fault tolerance) with quite different
discussion states.

Concerning fault tolerance, I like the ideas.
For the termination proposal, I would like to iterate a bit more.

*Termination algorithm:*

My main concern here is the introduction of a termination coordinator and
any involvement of RPC messages when deciding termination.
That would be such a fundamental break with the current runtime
architecture, and it would make the currently very elegant and simple model
much more complicated and harder to maintain. Given that Flink's runtime is
complex enough, I would really like to avoid that.

The current runtime paradigm coordinates between operators strictly via
in-band events. RPC calls happen between operators and the master for
triggering and acknowledging execution and checkpoints.

I was wondering whether we can keep following that paradigm and still get
most of what you are proposing here. In some sense, all we need to do is
replace RPC calls with in-band events, and "decentralize" the coordinator
such that every operator can make its own termination decision by itself.

This is only a rough sketch, you probably need to flesh it out more.

- I assume that the OP in the diagram knows that it is in a loop and that
it is the one connected to the head and tail

- When OP receives and EndOfStream Event from the regular source (RS), it
emits an "AttemptTermination" event downstream to the operators involved in
the loop. It attaches an attempt sequence number and memorizes that
- Tail and Head forward these events
- When OP receives the event back with the same attempt sequence number,
and no records came in the meantime, it shuts down and emits EndOfStream
downstream
- When other records came back between emitting the AttemptTermination
event and receiving it back, then it emits a new AttemptTermination event
with the next sequence number.
- This should terminate as soon as the loop is empty.

Might this model even generalize to nested loops, where the
"AttemptTermination" event is scoped by the loop's nesting level?

Let me know what you think!


Best,
Stephan


On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <se...@apache.org>> wrote:

Hi!

I am still scanning it and compiling some comments. Give me a bit ;-)

Stephan


On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <pa...@kth.se>> wrote:

Hey all,

Now that many of you have already scanned the document (judging from the
views) maybe it is time to give back some feedback!
Did you like it? Would you suggest an improvement?

I would suggest not to leave this in the void. It has to do with
important properties that the system promises to provide.
Me and Fouad will do our best to answer your questions and discuss this
further.

cheers
Paris

On 21 Oct 2016, at 08:54, Paris Carbone <pa...@kth.se><mailto:parisc@k
th.se<http://th.se>>> wrote:

Hello everyone,

Loops in Apache Flink have a good potential to become a much more
powerful thing in future version of Apache Flink.
There is generally high demand to make them usable and first of all
production-ready for upcoming releases.

As a first commitment we would like to propose FLIP-13 for consistent
processing with Loops.
We are also working on scoped loops for Q1 2017 which we can share if
there is enough interest.

For now, that is an improvement proposal that solves two pending major
issues:

1) The (not so trivial) problem of correct termination of jobs with
iterations
2) The applicability of the checkpointing algorithm to iterative dataflow
graphs.

We would really appreciate it if you go through the linked draft
(motivation and proposed changes) for FLIP-13 and point out comments,
preferably publicly in this devlist discussion before we go ahead and
update the wiki.

https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
BhDbtoYucmByBjRBISs/edit?usp=sharing

cheers

Paris and Fouad







Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Paris Carbone <pa...@kth.se>.
Hey Gabor,

These are some meaningful concerns! 

Joining data outside one's scope is somewhat similar to a “goto” statement when you write imperative code. It certainly allows you to have more complex data flow but then...programs become arbitrary (nested structures have no meaning).
On top of that, in a decentralised execution there are practical reasons for scopes such as decentralized progress tracking and termination establishment (Timely Dataflow went to strict scopes for the same reasons in the past).

Given the current proposal:
For multiple outputs you can always put out everything as one stream and split and repartition it outside the loop scope in a structured way so this is not a big issue.
Regarding binary input operators across scopes you can pre-join those streams before starting a new scope/loop. 

Another take we look into is to decouple scopes and loops which can allow you to define a custom scope with all streams you want to use registered in it.
We are preparing a prototype for that approach too so we can have it as an alternative, hopefully in the same FLIP soon.

cheers
Paris


> On 15 Dec 2016, at 15:52, Gábor Gévay <gg...@gmail.com> wrote:
> 
> Hi Paris and Fouad,
> 
> I finally had some time to delve into this. Thanks for the nice proposal!
> 
> +1 for also having a CoLoopFunction. That might be useful even if the
> input and feedback have the same type, as it might happen that I want
> to treat the elements coming on the feedback in a different way from
> the input.
> 
> I have some questions about StreamScope and the restrictions based on them:
> 
>> All binary operations can and should be restricted on data streams of the same StreamScope.
> 
> I'm wondering whether this might prevent some valid use cases, as it
> essentially restricts information getting into the loop to only the
> "official" input stream (i.e. the stream that is given to the loop
> method of the LoopFunction). For example, it might happen that I'm in
> some intermediate node in an iteration, but I also want to use some
> info from a stream in scope 0 that has nothing to do with the stream
> that is the "official" input of my iteration. So what I essentially
> mean is what if my iteration has multiple inputs? (Note: I actually do
> have such topologies in the paper that I'm currently writing.)
> 
> A similar concern is what if I would like to get out some information
> from the loop besides the "official" output stream that is being
> returned from LoopFunction.loop. That is, what if I want to have more
> than one output stream from an iteration?
> 
> Best,
> Gábor
> 
> 
> 
> 
> 2016-12-12 12:13 GMT+01:00 Paris Carbone <pa...@kth.se>:
>> Hi again folks,
>> 
>> It has been almost a month so I would like to remind you about the loop redesign which many of you have checked already.
>> Anyone else excited about iterative streams and wants to add a comment or simply +1 this? It would be more than welcome :)
>> 
>> You can find the description in the wiki [1] and the implementation as well.
>> We have put a lot of time on this and I think there is potential for even more future improvements when it comes to iterative computation.
>> 
>> cheers
>> Paris
>> 
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination
>> 
>> On 17 Nov 2016, at 14:00, Paris Carbone <pa...@kth.se>> wrote:
>> 
>> That was fast!  Seems to be working.
>> Thank you Fabian!
>> 
>> On 17 Nov 2016, at 13:58, Fabian Hueske <fh...@gmail.com>> wrote:
>> 
>> Hi Paris,
>> 
>> just gave you the permissions (I hope).
>> Let me know if something does not work.
>> 
>> Cheers, Fabian
>> 
>> 2016-11-17 13:48 GMT+01:00 Paris Carbone <pa...@kth.se>>:
>> 
>> We do not have to schedule this for an early Flink release, just saying.
>> I would just like to get the changes out and you people can review it and
>> integrate it anytime at your own pace.
>> 
>> Who is the admin of the wiki? It would be nice to get write access.
>> 
>> On 17 Nov 2016, at 13:45, Paris Carbone <pa...@kth.se>> wrote:
>> 
>> Sounds like a plan!
>> 
>> Can someone grant me access to write in the wiki please?
>> My username is “senorcarbone”.
>> 
>> Paris
>> 
>> On 16 Nov 2016, at 14:30, Gyula Fóra <gy...@gmail.com>> wrote:
>> 
>> I am not completely sure whether we should deprecate the old API for
>> 1.2 or
>> remove it completely. Personally I am in favor of removing it, I don't
>> think it is a huge burden to move to the new one if it makes for a much
>> nicer user experience.
>> 
>> I think you can go ahead add the FLIP to the wiki and open the PR so we
>> can
>> start the review if you have it ready anyways.
>> 
>> Gyula
>> 
>> Paris Carbone <pa...@kth.se>> ezt írta (időpont: 2016. nov. 16., Sze,
>> 11:55):
>> 
>> Thanks for reviewing, Gyula.
>> 
>> One thing that is still up to discussion is whether we should remove
>> completely the old iterations API or simply mark it as deprecated till
>> v2.0.
>> Also, not sure what is the best process now. We have the changes ready.
>> Should I copy the FLIP to the wiki and trigger the PRs or wait for a
>> few
>> more days in case someone has objections?
>> 
>> @Stephan, what is your take on our interpretation of the approach you
>> suggested? Should we proceed or is there anything that you do not find
>> nice?
>> 
>> Paris
>> 
>> On 15 Nov 2016, at 10:01, Gyula Fóra <gy...@apache.org>> wrote:
>> 
>> Hi Paris,
>> 
>> I like the proposed changes to the iteration API, this cleans up
>> things
>> in
>> the Java API without any strict restriction I think (it was never a
>> problem
>> in the Scala API).
>> 
>> The termination algorithm based on the proposed scoped loops seems to
>> be
>> fairly simple and looks good :)
>> 
>> Cheers,
>> Gyula
>> 
>> Paris Carbone <pa...@kth.se>> ezt írta (időpont: 2016. nov. 14., H,
>> 8:50):
>> 
>> That would be great Shi! Let's take that offline.
>> 
>> Anyone else interested in the iteration changes? It would be nice to
>> incorporate these to v1.2 if possible so I count on your review asap.
>> 
>> cheers,
>> Paris
>> 
>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <
>> xiaogang.sxg@alibaba-inc.com<ma...@alibaba-inc.com>
>> <ma...@alibaba-inc.com>> wrote:
>> 
>> Hi Paris
>> 
>> Unfortunately, the project is not public yet.
>> But i can provide you a primitive implementation of the update
>> protocol
>> in
>> the paper. It’s implemented in Storm. Since the protocol assumes the
>> communication channels between different tasks are dual, i think it’s
>> not
>> easy to adapt it to Flink.
>> 
>> Regards
>> Xiaogang
>> 
>> 
>> 在 2016年11月12日,上午3:03,Paris Carbone <parisc@kth.se<mailto:parisc@
>> kth.se
>> 
>> 写道:
>> 
>> Hi Shi,
>> 
>> Naiad/Timely Dataflow and other projects use global coordination
>> which
>> is
>> very convenient for asynchronous progress tracking in general but it
>> has
>> some downsides in a production systems that count on in-flight
>> transactional control mechanisms and rollback recovery guarantees.
>> This
>> is
>> why we generally prefer decentralized approaches (despite their our
>> downsides).
>> 
>> Regarding synchronous/structured iterations, this is a bit off topic
>> and
>> they are a bit of a different story as you already know.
>> We maintain a graph streaming (gelly-streams) library on Flink that
>> you
>> might find interesting [1]. Vasia, another Flink committer is also
>> working
>> on that among others.
>> You can keep an eye on it since we are planning to use this project
>> as a
>> showcase for a new way of doing structured and fixpoint iterations on
>> streams in the future.
>> 
>> P.S. many thanks for sharing your publication, it was an interesting
>> read.
>> Do you happen to have your source code public? We could most
>> certainly
>> use
>> it in an benchmark soon.
>> 
>> [1] https://github.com/vasia/gelly-streaming
>> 
>> 
>> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<
>> mailto:
>> shixiaogangg@gmail.com><ma...@gmail.com>> wrote:
>> 
>> Hi, Fouad
>> 
>> Thank you for the explanation. Now the centralized method seems
>> correct
>> to
>> me.
>> The passing of StatusUpdate events will lead to synchronous
>> iterations
>> and
>> we are using the information in each iterations to terminate the
>> computation.
>> 
>> Actually, i prefer the centralized method because in many
>> applications,
>> the
>> convergence may depend on some global statistics.
>> For example, a PageRank program may terminate the computation when
>> 99%
>> vertices are converged.
>> I think those learning programs which cannot reach the fixed-point
>> (oscillating around the fixed-point) can benefit a lot from such
>> features.
>> The decentralized method makes it hard to support such convergence
>> conditions.
>> 
>> 
>> Another concern is that Flink cannot produce periodical results in
>> the
>> iteration over infinite data streams.
>> Take a concrete example. Given an edge stream constructing a graph,
>> the
>> user may need the PageRank weight of each vertex in the graphs
>> formed at
>> certain instants.
>> Currently Flink does not provide any input or iteration information
>> to
>> users, making users hard to implement such real-time iterative
>> applications.
>> Such features are supported in both Naiad and Tornado. I think Flink
>> should
>> support it as well.
>> 
>> What do you think?
>> 
>> Regards
>> Xiaogang
>> 
>> 
>> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<
>> mailto:
>> fouad.alsayadi@gmail.com><ma...@gmail.com>>:
>> 
>> Hi Shi,
>> 
>> It seems that you are referring to the centralized algorithm which
>> is no
>> longer the proposed version.
>> In the decentralized version (check last doc) there is no master
>> node or
>> global coordination involved.
>> 
>> Let us keep this discussion to the decentralized one if possible.
>> 
>> To answer your points on the previous approach, there is a catch in
>> your
>> trace at t7. Here is what is happening :
>> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
>> runtime (see 2.1 in the steps).
>> - RS and Heads will broadcast StatusUpdate  event and will not notify
>> its
>> status.
>> - When StatusUpdate event gets back to the head it will notify its
>> WORKING  status.
>> 
>> Hope that answers your concern.
>> 
>> Best,
>> Fouad
>> 
>> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com
>> <mailto:
>> shixiaogangg@gmail.com><ma...@gmail.com>>
>> wrote:
>> 
>> Hi Paris
>> 
>> I have several concerns about the correctness of the termination
>> protocol.
>> I think the termination protocol put an end to the computation even
>> when
>> the computation has not converged.
>> 
>> Suppose there exists a loop context constructed by a OP operator, a
>> Head
>> operator and a Tail operator (illustrated in Figure 2 in the first
>> draft).
>> The stream only contains one record. OP will pass the record to its
>> downstream operators 10 times. In other words, the loop should
>> iterate
>> 10
>> times.
>> 
>> If I understood the protocol correctly, the following event sequence
>> may
>> happen in the computation:
>> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream",
>> the
>> system enters into Speculative Phase.
>> t2:  OP receives Record and emits it to TAIL.
>> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
>> state.
>> t4. OP receives the UpdateStatus event from HEAD, and notifies with
>> an
>> WORKING state.
>> t5. TAIL receives Record and emits it to HEAD.
>> t6. TAIL receives the UpdateStatus event from OP, and notifies with
>> an
>> WORKING state.
>> t7. The system starts a new attempt. HEAD receives the UpdateStatus
>> event
>> and notifies with an IDLE state.  (Record is still in transition.)
>> t8. OP receives the UpdateStatus event from HEAD and notifies with an
>> IDLE
>> state.
>> t9. TAIL receives the UpdateStatus event from OP and notifies with an
>> IDLE
>> state.
>> t10. HEAD receives Record from TAIL and emits it to OP.
>> t11. System puts an end to the computation.
>> 
>> Though the computation is expected to iterate 10 times, it ends
>> earlier.
>> The cause is that the communication channels of MASTER=>HEAD and
>> TAIL=>HEAD
>> are not synchronized.
>> 
>> I think the protocol follows the idea of the Chandy-Lamport
>> algorithm to
>> determine a global state.
>> But the information of whether a node has processed any record to
>> since
>> the
>> last request is not STABLE.
>> Hence i doubt the correctness of the protocol.
>> 
>> To determine the termination correctly, we need some information
>> that is
>> stable.
>> In timelyflow, Naiad collects the progress made in each iteration and
>> terminates the loop when a little progress is made in an iteration
>> (identified by the timestamp vector).
>> The information is stable because the result of an iteration cannot
>> be
>> changed by the execution of later iterations.
>> 
>> A similar method is also adopted in Tornado.
>> You may see my paper for more details about the termination of loops:
>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
>> 
>> Regards
>> Xiaogang
>> 
>> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
>> parisc@kth.se><ma...@kth.se> <mailto:
>> parisc@kth.se<ma...@kth.se>>>:
>> 
>> Hi again Flink folks,
>> 
>> Here is our new proposal that addresses Job Termination - the loop
>> fault
>> tolerance proposal will follow shortly.
>> As Stephan hinted, we need operators to be aware of their scope
>> level.
>> 
>> Thus, it is time we make loops great again! :)
>> 
>> Part of this FLIP basically introduces a new functional,
>> compositional
>> API
>> for defining asynchronous loops for DataStreams.
>> This is coupled with a decentralized algorithm for job termination
>> with
>> loops - along the lines of what Stephan described.
>> We are already working on the actual prototypes as you can observe in
>> the
>> links of the doc.
>> 
>> Please let us know if you like (or don't like) it and why, in this
>> mail
>> discussion.
>> 
>> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
>> PfTHtq3173EhsAkpBoQ
>> 
>> cheers
>> Paris and Fouad
>> 
>> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
>> parisc@kth.se> <mailto:
>> parisc@kth.se<ma...@kth.se>><mailto:
>> parisc@
>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
>> http://kth.se/
>> 
>> wrote:
>> 
>> Hey Stephan,
>> 
>> Thanks for looking into it!
>> 
>> +1 for breaking this up, will do that.
>> 
>> I can see your point and maybe it makes sense to introduce part of
>> scoping
>> to incorporate support for nested loops (otherwise it can’t work).
>> Let us think about this a bit. We will share another draft for a more
>> detail description of the approach you are suggesting asap.
>> 
>> 
>> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
>> sewen@apache.org><ma...@apache.org> <mailto:
>> sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
>> <mailto:sewen
>> @apache.org<http://apache.org/>>> wrote:
>> 
>> How about we break this up into two FLIPs? There are after all two
>> orthogonal problems (termination, fault tolerance) with quite
>> different
>> discussion states.
>> 
>> Concerning fault tolerance, I like the ideas.
>> For the termination proposal, I would like to iterate a bit more.
>> 
>> *Termination algorithm:*
>> 
>> My main concern here is the introduction of a termination coordinator
>> and
>> any involvement of RPC messages when deciding termination.
>> That would be such a fundamental break with the current runtime
>> architecture, and it would make the currently very elegant and simple
>> model
>> much more complicated and harder to maintain. Given that Flink's
>> runtime is
>> complex enough, I would really like to avoid that.
>> 
>> The current runtime paradigm coordinates between operators strictly
>> via
>> in-band events. RPC calls happen between operators and the master for
>> triggering and acknowledging execution and checkpoints.
>> 
>> I was wondering whether we can keep following that paradigm and still
>> get
>> most of what you are proposing here. In some sense, all we need to
>> do is
>> replace RPC calls with in-band events, and "decentralize" the
>> coordinator
>> such that every operator can make its own termination decision by
>> itself.
>> 
>> This is only a rough sketch, you probably need to flesh it out more.
>> 
>> - I assume that the OP in the diagram knows that it is in a loop and
>> that
>> it is the one connected to the head and tail
>> 
>> - When OP receives and EndOfStream Event from the regular source
>> (RS),
>> it
>> emits an "AttemptTermination" event downstream to the operators
>> involved in
>> the loop. It attaches an attempt sequence number and memorizes that
>> - Tail and Head forward these events
>> - When OP receives the event back with the same attempt sequence
>> number,
>> and no records came in the meantime, it shuts down and emits
>> EndOfStream
>> downstream
>> - When other records came back between emitting the
>> AttemptTermination
>> event and receiving it back, then it emits a new AttemptTermination
>> event
>> with the next sequence number.
>> - This should terminate as soon as the loop is empty.
>> 
>> Might this model even generalize to nested loops, where the
>> "AttemptTermination" event is scoped by the loop's nesting level?
>> 
>> Let me know what you think!
>> 
>> 
>> Best,
>> Stephan
>> 
>> 
>> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org
>> <mailto:
>> sewen@apache.org><ma...@apache.org>
>> <ma...@apache.org><mailto:
>> sewen@apache.org<ma...@apache.org>
>> <ma...@apache.org>>> wrote:
>> 
>> Hi!
>> 
>> I am still scanning it and compiling some comments. Give me a bit ;-)
>> 
>> Stephan
>> 
>> 
>> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se
>> <mailto:
>> parisc@kth.se><ma...@kth.se> <mailto:
>> parisc@kth.se<ma...@kth.se>><mailto:
>> parisc@kth.se<ma...@kth.se> <mailto:
>> parisc@kth.se>>> wrote:
>> 
>> Hey all,
>> 
>> Now that many of you have already scanned the document (judging from
>> the
>> views) maybe it is time to give back some feedback!
>> Did you like it? Would you suggest an improvement?
>> 
>> I would suggest not to leave this in the void. It has to do with
>> important properties that the system promises to provide.
>> Me and Fouad will do our best to answer your questions and discuss
>> this
>> further.
>> 
>> cheers
>> Paris
>> 
>> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
>> parisc@kth.se><ma...@kth.se> <mailto:
>> parisc@kth.se<ma...@kth.se>><mailto:
>> parisc@
>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
>> http://kth.se/
>> <mailto:parisc@k
>> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
>> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
>> 
>> Hello everyone,
>> 
>> Loops in Apache Flink have a good potential to become a much more
>> powerful thing in future version of Apache Flink.
>> There is generally high demand to make them usable and first of all
>> production-ready for upcoming releases.
>> 
>> As a first commitment we would like to propose FLIP-13 for consistent
>> processing with Loops.
>> We are also working on scoped loops for Q1 2017 which we can share if
>> there is enough interest.
>> 
>> For now, that is an improvement proposal that solves two pending
>> major
>> issues:
>> 
>> 1) The (not so trivial) problem of correct termination of jobs with
>> iterations
>> 2) The applicability of the checkpointing algorithm to iterative
>> dataflow
>> graphs.
>> 
>> We would really appreciate it if you go through the linked draft
>> (motivation and proposed changes) for FLIP-13 and point out comments,
>> preferably publicly in this devlist discussion before we go ahead and
>> update the wiki.
>> 
>> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
>> BhDbtoYucmByBjRBISs/edit?usp=sharing
>> 
>> cheers
>> 
>> Paris and Fouad
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 


Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Gábor Gévay <gg...@gmail.com>.
Hi Paris and Fouad,

I finally had some time to delve into this. Thanks for the nice proposal!

+1 for also having a CoLoopFunction. That might be useful even if the
input and feedback have the same type, as it might happen that I want
to treat the elements coming on the feedback in a different way from
the input.

I have some questions about StreamScope and the restrictions based on them:

> All binary operations can and should be restricted on data streams of the same StreamScope.

I'm wondering whether this might prevent some valid use cases, as it
essentially restricts information getting into the loop to only the
"official" input stream (i.e. the stream that is given to the loop
method of the LoopFunction). For example, it might happen that I'm in
some intermediate node in an iteration, but I also want to use some
info from a stream in scope 0 that has nothing to do with the stream
that is the "official" input of my iteration. So what I essentially
mean is what if my iteration has multiple inputs? (Note: I actually do
have such topologies in the paper that I'm currently writing.)

A similar concern is what if I would like to get out some information
from the loop besides the "official" output stream that is being
returned from LoopFunction.loop. That is, what if I want to have more
than one output stream from an iteration?

Best,
Gábor




2016-12-12 12:13 GMT+01:00 Paris Carbone <pa...@kth.se>:
> Hi again folks,
>
> It has been almost a month so I would like to remind you about the loop redesign which many of you have checked already.
> Anyone else excited about iterative streams and wants to add a comment or simply +1 this? It would be more than welcome :)
>
> You can find the description in the wiki [1] and the implementation as well.
> We have put a lot of time on this and I think there is potential for even more future improvements when it comes to iterative computation.
>
> cheers
> Paris
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination
>
> On 17 Nov 2016, at 14:00, Paris Carbone <pa...@kth.se>> wrote:
>
> That was fast!  Seems to be working.
> Thank you Fabian!
>
> On 17 Nov 2016, at 13:58, Fabian Hueske <fh...@gmail.com>> wrote:
>
> Hi Paris,
>
> just gave you the permissions (I hope).
> Let me know if something does not work.
>
> Cheers, Fabian
>
> 2016-11-17 13:48 GMT+01:00 Paris Carbone <pa...@kth.se>>:
>
> We do not have to schedule this for an early Flink release, just saying.
> I would just like to get the changes out and you people can review it and
> integrate it anytime at your own pace.
>
> Who is the admin of the wiki? It would be nice to get write access.
>
> On 17 Nov 2016, at 13:45, Paris Carbone <pa...@kth.se>> wrote:
>
> Sounds like a plan!
>
> Can someone grant me access to write in the wiki please?
> My username is “senorcarbone”.
>
> Paris
>
> On 16 Nov 2016, at 14:30, Gyula Fóra <gy...@gmail.com>> wrote:
>
> I am not completely sure whether we should deprecate the old API for
> 1.2 or
> remove it completely. Personally I am in favor of removing it, I don't
> think it is a huge burden to move to the new one if it makes for a much
> nicer user experience.
>
> I think you can go ahead add the FLIP to the wiki and open the PR so we
> can
> start the review if you have it ready anyways.
>
> Gyula
>
> Paris Carbone <pa...@kth.se>> ezt írta (időpont: 2016. nov. 16., Sze,
> 11:55):
>
> Thanks for reviewing, Gyula.
>
> One thing that is still up to discussion is whether we should remove
> completely the old iterations API or simply mark it as deprecated till
> v2.0.
> Also, not sure what is the best process now. We have the changes ready.
> Should I copy the FLIP to the wiki and trigger the PRs or wait for a
> few
> more days in case someone has objections?
>
> @Stephan, what is your take on our interpretation of the approach you
> suggested? Should we proceed or is there anything that you do not find
> nice?
>
> Paris
>
> On 15 Nov 2016, at 10:01, Gyula Fóra <gy...@apache.org>> wrote:
>
> Hi Paris,
>
> I like the proposed changes to the iteration API, this cleans up
> things
> in
> the Java API without any strict restriction I think (it was never a
> problem
> in the Scala API).
>
> The termination algorithm based on the proposed scoped loops seems to
> be
> fairly simple and looks good :)
>
> Cheers,
> Gyula
>
> Paris Carbone <pa...@kth.se>> ezt írta (időpont: 2016. nov. 14., H,
> 8:50):
>
> That would be great Shi! Let's take that offline.
>
> Anyone else interested in the iteration changes? It would be nice to
> incorporate these to v1.2 if possible so I count on your review asap.
>
> cheers,
> Paris
>
> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <
> xiaogang.sxg@alibaba-inc.com<ma...@alibaba-inc.com>
> <ma...@alibaba-inc.com>> wrote:
>
> Hi Paris
>
> Unfortunately, the project is not public yet.
> But i can provide you a primitive implementation of the update
> protocol
> in
> the paper. It’s implemented in Storm. Since the protocol assumes the
> communication channels between different tasks are dual, i think it’s
> not
> easy to adapt it to Flink.
>
> Regards
> Xiaogang
>
>
> 在 2016年11月12日,上午3:03,Paris Carbone <parisc@kth.se<mailto:parisc@
> kth.se
>
> 写道:
>
> Hi Shi,
>
> Naiad/Timely Dataflow and other projects use global coordination
> which
> is
> very convenient for asynchronous progress tracking in general but it
> has
> some downsides in a production systems that count on in-flight
> transactional control mechanisms and rollback recovery guarantees.
> This
> is
> why we generally prefer decentralized approaches (despite their our
> downsides).
>
> Regarding synchronous/structured iterations, this is a bit off topic
> and
> they are a bit of a different story as you already know.
> We maintain a graph streaming (gelly-streams) library on Flink that
> you
> might find interesting [1]. Vasia, another Flink committer is also
> working
> on that among others.
> You can keep an eye on it since we are planning to use this project
> as a
> showcase for a new way of doing structured and fixpoint iterations on
> streams in the future.
>
> P.S. many thanks for sharing your publication, it was an interesting
> read.
> Do you happen to have your source code public? We could most
> certainly
> use
> it in an benchmark soon.
>
> [1] https://github.com/vasia/gelly-streaming
>
>
> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<
> mailto:
> shixiaogangg@gmail.com><ma...@gmail.com>> wrote:
>
> Hi, Fouad
>
> Thank you for the explanation. Now the centralized method seems
> correct
> to
> me.
> The passing of StatusUpdate events will lead to synchronous
> iterations
> and
> we are using the information in each iterations to terminate the
> computation.
>
> Actually, i prefer the centralized method because in many
> applications,
> the
> convergence may depend on some global statistics.
> For example, a PageRank program may terminate the computation when
> 99%
> vertices are converged.
> I think those learning programs which cannot reach the fixed-point
> (oscillating around the fixed-point) can benefit a lot from such
> features.
> The decentralized method makes it hard to support such convergence
> conditions.
>
>
> Another concern is that Flink cannot produce periodical results in
> the
> iteration over infinite data streams.
> Take a concrete example. Given an edge stream constructing a graph,
> the
> user may need the PageRank weight of each vertex in the graphs
> formed at
> certain instants.
> Currently Flink does not provide any input or iteration information
> to
> users, making users hard to implement such real-time iterative
> applications.
> Such features are supported in both Naiad and Tornado. I think Flink
> should
> support it as well.
>
> What do you think?
>
> Regards
> Xiaogang
>
>
> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<
> mailto:
> fouad.alsayadi@gmail.com><ma...@gmail.com>>:
>
> Hi Shi,
>
> It seems that you are referring to the centralized algorithm which
> is no
> longer the proposed version.
> In the decentralized version (check last doc) there is no master
> node or
> global coordination involved.
>
> Let us keep this discussion to the decentralized one if possible.
>
> To answer your points on the previous approach, there is a catch in
> your
> trace at t7. Here is what is happening :
> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
> runtime (see 2.1 in the steps).
> - RS and Heads will broadcast StatusUpdate  event and will not notify
> its
> status.
> - When StatusUpdate event gets back to the head it will notify its
> WORKING  status.
>
> Hope that answers your concern.
>
> Best,
> Fouad
>
> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com
> <mailto:
> shixiaogangg@gmail.com><ma...@gmail.com>>
> wrote:
>
> Hi Paris
>
> I have several concerns about the correctness of the termination
> protocol.
> I think the termination protocol put an end to the computation even
> when
> the computation has not converged.
>
> Suppose there exists a loop context constructed by a OP operator, a
> Head
> operator and a Tail operator (illustrated in Figure 2 in the first
> draft).
> The stream only contains one record. OP will pass the record to its
> downstream operators 10 times. In other words, the loop should
> iterate
> 10
> times.
>
> If I understood the protocol correctly, the following event sequence
> may
> happen in the computation:
> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream",
> the
> system enters into Speculative Phase.
> t2:  OP receives Record and emits it to TAIL.
> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
> state.
> t4. OP receives the UpdateStatus event from HEAD, and notifies with
> an
> WORKING state.
> t5. TAIL receives Record and emits it to HEAD.
> t6. TAIL receives the UpdateStatus event from OP, and notifies with
> an
> WORKING state.
> t7. The system starts a new attempt. HEAD receives the UpdateStatus
> event
> and notifies with an IDLE state.  (Record is still in transition.)
> t8. OP receives the UpdateStatus event from HEAD and notifies with an
> IDLE
> state.
> t9. TAIL receives the UpdateStatus event from OP and notifies with an
> IDLE
> state.
> t10. HEAD receives Record from TAIL and emits it to OP.
> t11. System puts an end to the computation.
>
> Though the computation is expected to iterate 10 times, it ends
> earlier.
> The cause is that the communication channels of MASTER=>HEAD and
> TAIL=>HEAD
> are not synchronized.
>
> I think the protocol follows the idea of the Chandy-Lamport
> algorithm to
> determine a global state.
> But the information of whether a node has processed any record to
> since
> the
> last request is not STABLE.
> Hence i doubt the correctness of the protocol.
>
> To determine the termination correctly, we need some information
> that is
> stable.
> In timelyflow, Naiad collects the progress made in each iteration and
> terminates the loop when a little progress is made in an iteration
> (identified by the timestamp vector).
> The information is stable because the result of an iteration cannot
> be
> changed by the execution of later iterations.
>
> A similar method is also adopted in Tornado.
> You may see my paper for more details about the termination of loops:
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
>
> Regards
> Xiaogang
>
> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
> parisc@kth.se><ma...@kth.se> <mailto:
> parisc@kth.se<ma...@kth.se>>>:
>
> Hi again Flink folks,
>
> Here is our new proposal that addresses Job Termination - the loop
> fault
> tolerance proposal will follow shortly.
> As Stephan hinted, we need operators to be aware of their scope
> level.
>
> Thus, it is time we make loops great again! :)
>
> Part of this FLIP basically introduces a new functional,
> compositional
> API
> for defining asynchronous loops for DataStreams.
> This is coupled with a decentralized algorithm for job termination
> with
> loops - along the lines of what Stephan described.
> We are already working on the actual prototypes as you can observe in
> the
> links of the doc.
>
> Please let us know if you like (or don't like) it and why, in this
> mail
> discussion.
>
> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
> PfTHtq3173EhsAkpBoQ
>
> cheers
> Paris and Fouad
>
> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
> parisc@kth.se> <mailto:
> parisc@kth.se<ma...@kth.se>><mailto:
> parisc@
> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
> http://kth.se/
>
> wrote:
>
> Hey Stephan,
>
> Thanks for looking into it!
>
> +1 for breaking this up, will do that.
>
> I can see your point and maybe it makes sense to introduce part of
> scoping
> to incorporate support for nested loops (otherwise it can’t work).
> Let us think about this a bit. We will share another draft for a more
> detail description of the approach you are suggesting asap.
>
>
> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
> sewen@apache.org><ma...@apache.org> <mailto:
> sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
> <mailto:sewen
> @apache.org<http://apache.org/>>> wrote:
>
> How about we break this up into two FLIPs? There are after all two
> orthogonal problems (termination, fault tolerance) with quite
> different
> discussion states.
>
> Concerning fault tolerance, I like the ideas.
> For the termination proposal, I would like to iterate a bit more.
>
> *Termination algorithm:*
>
> My main concern here is the introduction of a termination coordinator
> and
> any involvement of RPC messages when deciding termination.
> That would be such a fundamental break with the current runtime
> architecture, and it would make the currently very elegant and simple
> model
> much more complicated and harder to maintain. Given that Flink's
> runtime is
> complex enough, I would really like to avoid that.
>
> The current runtime paradigm coordinates between operators strictly
> via
> in-band events. RPC calls happen between operators and the master for
> triggering and acknowledging execution and checkpoints.
>
> I was wondering whether we can keep following that paradigm and still
> get
> most of what you are proposing here. In some sense, all we need to
> do is
> replace RPC calls with in-band events, and "decentralize" the
> coordinator
> such that every operator can make its own termination decision by
> itself.
>
> This is only a rough sketch, you probably need to flesh it out more.
>
> - I assume that the OP in the diagram knows that it is in a loop and
> that
> it is the one connected to the head and tail
>
> - When OP receives and EndOfStream Event from the regular source
> (RS),
> it
> emits an "AttemptTermination" event downstream to the operators
> involved in
> the loop. It attaches an attempt sequence number and memorizes that
> - Tail and Head forward these events
> - When OP receives the event back with the same attempt sequence
> number,
> and no records came in the meantime, it shuts down and emits
> EndOfStream
> downstream
> - When other records came back between emitting the
> AttemptTermination
> event and receiving it back, then it emits a new AttemptTermination
> event
> with the next sequence number.
> - This should terminate as soon as the loop is empty.
>
> Might this model even generalize to nested loops, where the
> "AttemptTermination" event is scoped by the loop's nesting level?
>
> Let me know what you think!
>
>
> Best,
> Stephan
>
>
> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org
> <mailto:
> sewen@apache.org><ma...@apache.org>
> <ma...@apache.org><mailto:
> sewen@apache.org<ma...@apache.org>
> <ma...@apache.org>>> wrote:
>
> Hi!
>
> I am still scanning it and compiling some comments. Give me a bit ;-)
>
> Stephan
>
>
> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se
> <mailto:
> parisc@kth.se><ma...@kth.se> <mailto:
> parisc@kth.se<ma...@kth.se>><mailto:
> parisc@kth.se<ma...@kth.se> <mailto:
> parisc@kth.se>>> wrote:
>
> Hey all,
>
> Now that many of you have already scanned the document (judging from
> the
> views) maybe it is time to give back some feedback!
> Did you like it? Would you suggest an improvement?
>
> I would suggest not to leave this in the void. It has to do with
> important properties that the system promises to provide.
> Me and Fouad will do our best to answer your questions and discuss
> this
> further.
>
> cheers
> Paris
>
> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
> parisc@kth.se><ma...@kth.se> <mailto:
> parisc@kth.se<ma...@kth.se>><mailto:
> parisc@
> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
> http://kth.se/
> <mailto:parisc@k
> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
>
> Hello everyone,
>
> Loops in Apache Flink have a good potential to become a much more
> powerful thing in future version of Apache Flink.
> There is generally high demand to make them usable and first of all
> production-ready for upcoming releases.
>
> As a first commitment we would like to propose FLIP-13 for consistent
> processing with Loops.
> We are also working on scoped loops for Q1 2017 which we can share if
> there is enough interest.
>
> For now, that is an improvement proposal that solves two pending
> major
> issues:
>
> 1) The (not so trivial) problem of correct termination of jobs with
> iterations
> 2) The applicability of the checkpointing algorithm to iterative
> dataflow
> graphs.
>
> We would really appreciate it if you go through the linked draft
> (motivation and proposed changes) for FLIP-13 and point out comments,
> preferably publicly in this devlist discussion before we go ahead and
> update the wiki.
>
> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
> BhDbtoYucmByBjRBISs/edit?usp=sharing
>
> cheers
>
> Paris and Fouad
>
>
>
>
>
>
>
>
>

Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Paris Carbone <pa...@kth.se>.
Hi again folks,

It has been almost a month so I would like to remind you about the loop redesign which many of you have checked already.
Anyone else excited about iterative streams and wants to add a comment or simply +1 this? It would be more than welcome :)

You can find the description in the wiki [1] and the implementation as well.
We have put a lot of time on this and I think there is potential for even more future improvements when it comes to iterative computation.

cheers
Paris

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination

On 17 Nov 2016, at 14:00, Paris Carbone <pa...@kth.se>> wrote:

That was fast!  Seems to be working.
Thank you Fabian!

On 17 Nov 2016, at 13:58, Fabian Hueske <fh...@gmail.com>> wrote:

Hi Paris,

just gave you the permissions (I hope).
Let me know if something does not work.

Cheers, Fabian

2016-11-17 13:48 GMT+01:00 Paris Carbone <pa...@kth.se>>:

We do not have to schedule this for an early Flink release, just saying.
I would just like to get the changes out and you people can review it and
integrate it anytime at your own pace.

Who is the admin of the wiki? It would be nice to get write access.

On 17 Nov 2016, at 13:45, Paris Carbone <pa...@kth.se>> wrote:

Sounds like a plan!

Can someone grant me access to write in the wiki please?
My username is “senorcarbone”.

Paris

On 16 Nov 2016, at 14:30, Gyula Fóra <gy...@gmail.com>> wrote:

I am not completely sure whether we should deprecate the old API for
1.2 or
remove it completely. Personally I am in favor of removing it, I don't
think it is a huge burden to move to the new one if it makes for a much
nicer user experience.

I think you can go ahead add the FLIP to the wiki and open the PR so we
can
start the review if you have it ready anyways.

Gyula

Paris Carbone <pa...@kth.se>> ezt írta (időpont: 2016. nov. 16., Sze,
11:55):

Thanks for reviewing, Gyula.

One thing that is still up to discussion is whether we should remove
completely the old iterations API or simply mark it as deprecated till
v2.0.
Also, not sure what is the best process now. We have the changes ready.
Should I copy the FLIP to the wiki and trigger the PRs or wait for a
few
more days in case someone has objections?

@Stephan, what is your take on our interpretation of the approach you
suggested? Should we proceed or is there anything that you do not find
nice?

Paris

On 15 Nov 2016, at 10:01, Gyula Fóra <gy...@apache.org>> wrote:

Hi Paris,

I like the proposed changes to the iteration API, this cleans up
things
in
the Java API without any strict restriction I think (it was never a
problem
in the Scala API).

The termination algorithm based on the proposed scoped loops seems to
be
fairly simple and looks good :)

Cheers,
Gyula

Paris Carbone <pa...@kth.se>> ezt írta (időpont: 2016. nov. 14., H,
8:50):

That would be great Shi! Let's take that offline.

Anyone else interested in the iteration changes? It would be nice to
incorporate these to v1.2 if possible so I count on your review asap.

cheers,
Paris

On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <
xiaogang.sxg@alibaba-inc.com<ma...@alibaba-inc.com>
<ma...@alibaba-inc.com>> wrote:

Hi Paris

Unfortunately, the project is not public yet.
But i can provide you a primitive implementation of the update
protocol
in
the paper. It’s implemented in Storm. Since the protocol assumes the
communication channels between different tasks are dual, i think it’s
not
easy to adapt it to Flink.

Regards
Xiaogang


在 2016年11月12日,上午3:03,Paris Carbone <parisc@kth.se<mailto:parisc@
kth.se

写道:

Hi Shi,

Naiad/Timely Dataflow and other projects use global coordination
which
is
very convenient for asynchronous progress tracking in general but it
has
some downsides in a production systems that count on in-flight
transactional control mechanisms and rollback recovery guarantees.
This
is
why we generally prefer decentralized approaches (despite their our
downsides).

Regarding synchronous/structured iterations, this is a bit off topic
and
they are a bit of a different story as you already know.
We maintain a graph streaming (gelly-streams) library on Flink that
you
might find interesting [1]. Vasia, another Flink committer is also
working
on that among others.
You can keep an eye on it since we are planning to use this project
as a
showcase for a new way of doing structured and fixpoint iterations on
streams in the future.

P.S. many thanks for sharing your publication, it was an interesting
read.
Do you happen to have your source code public? We could most
certainly
use
it in an benchmark soon.

[1] https://github.com/vasia/gelly-streaming


On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<
mailto:
shixiaogangg@gmail.com><ma...@gmail.com>> wrote:

Hi, Fouad

Thank you for the explanation. Now the centralized method seems
correct
to
me.
The passing of StatusUpdate events will lead to synchronous
iterations
and
we are using the information in each iterations to terminate the
computation.

Actually, i prefer the centralized method because in many
applications,
the
convergence may depend on some global statistics.
For example, a PageRank program may terminate the computation when
99%
vertices are converged.
I think those learning programs which cannot reach the fixed-point
(oscillating around the fixed-point) can benefit a lot from such
features.
The decentralized method makes it hard to support such convergence
conditions.


Another concern is that Flink cannot produce periodical results in
the
iteration over infinite data streams.
Take a concrete example. Given an edge stream constructing a graph,
the
user may need the PageRank weight of each vertex in the graphs
formed at
certain instants.
Currently Flink does not provide any input or iteration information
to
users, making users hard to implement such real-time iterative
applications.
Such features are supported in both Naiad and Tornado. I think Flink
should
support it as well.

What do you think?

Regards
Xiaogang


2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<
mailto:
fouad.alsayadi@gmail.com><ma...@gmail.com>>:

Hi Shi,

It seems that you are referring to the centralized algorithm which
is no
longer the proposed version.
In the decentralized version (check last doc) there is no master
node or
global coordination involved.

Let us keep this discussion to the decentralized one if possible.

To answer your points on the previous approach, there is a catch in
your
trace at t7. Here is what is happening :
- Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
runtime (see 2.1 in the steps).
- RS and Heads will broadcast StatusUpdate  event and will not notify
its
status.
- When StatusUpdate event gets back to the head it will notify its
WORKING  status.

Hope that answers your concern.

Best,
Fouad

On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com
<mailto:
shixiaogangg@gmail.com><ma...@gmail.com>>
wrote:

Hi Paris

I have several concerns about the correctness of the termination
protocol.
I think the termination protocol put an end to the computation even
when
the computation has not converged.

Suppose there exists a loop context constructed by a OP operator, a
Head
operator and a Tail operator (illustrated in Figure 2 in the first
draft).
The stream only contains one record. OP will pass the record to its
downstream operators 10 times. In other words, the loop should
iterate
10
times.

If I understood the protocol correctly, the following event sequence
may
happen in the computation:
t1:  RS emits Record to OP. Since RS has reached the "end-of-stream",
the
system enters into Speculative Phase.
t2:  OP receives Record and emits it to TAIL.
t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
state.
t4. OP receives the UpdateStatus event from HEAD, and notifies with
an
WORKING state.
t5. TAIL receives Record and emits it to HEAD.
t6. TAIL receives the UpdateStatus event from OP, and notifies with
an
WORKING state.
t7. The system starts a new attempt. HEAD receives the UpdateStatus
event
and notifies with an IDLE state.  (Record is still in transition.)
t8. OP receives the UpdateStatus event from HEAD and notifies with an
IDLE
state.
t9. TAIL receives the UpdateStatus event from OP and notifies with an
IDLE
state.
t10. HEAD receives Record from TAIL and emits it to OP.
t11. System puts an end to the computation.

Though the computation is expected to iterate 10 times, it ends
earlier.
The cause is that the communication channels of MASTER=>HEAD and
TAIL=>HEAD
are not synchronized.

I think the protocol follows the idea of the Chandy-Lamport
algorithm to
determine a global state.
But the information of whether a node has processed any record to
since
the
last request is not STABLE.
Hence i doubt the correctness of the protocol.

To determine the termination correctly, we need some information
that is
stable.
In timelyflow, Naiad collects the progress made in each iteration and
terminates the loop when a little progress is made in an iteration
(identified by the timestamp vector).
The information is stable because the result of an iteration cannot
be
changed by the execution of later iterations.

A similar method is also adopted in Tornado.
You may see my paper for more details about the termination of loops:
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>

Regards
Xiaogang

2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
parisc@kth.se><ma...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>>>:

Hi again Flink folks,

Here is our new proposal that addresses Job Termination - the loop
fault
tolerance proposal will follow shortly.
As Stephan hinted, we need operators to be aware of their scope
level.

Thus, it is time we make loops great again! :)

Part of this FLIP basically introduces a new functional,
compositional
API
for defining asynchronous loops for DataStreams.
This is coupled with a decentralized algorithm for job termination
with
loops - along the lines of what Stephan described.
We are already working on the actual prototypes as you can observe in
the
links of the doc.

Please let us know if you like (or don't like) it and why, in this
mail
discussion.

https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
PfTHtq3173EhsAkpBoQ

cheers
Paris and Fouad

On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
parisc@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>><mailto:
parisc@
kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
http://kth.se/

wrote:

Hey Stephan,

Thanks for looking into it!

+1 for breaking this up, will do that.

I can see your point and maybe it makes sense to introduce part of
scoping
to incorporate support for nested loops (otherwise it can’t work).
Let us think about this a bit. We will share another draft for a more
detail description of the approach you are suggesting asap.


On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
sewen@apache.org><ma...@apache.org> <mailto:
sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
<mailto:sewen
@apache.org<http://apache.org/>>> wrote:

How about we break this up into two FLIPs? There are after all two
orthogonal problems (termination, fault tolerance) with quite
different
discussion states.

Concerning fault tolerance, I like the ideas.
For the termination proposal, I would like to iterate a bit more.

*Termination algorithm:*

My main concern here is the introduction of a termination coordinator
and
any involvement of RPC messages when deciding termination.
That would be such a fundamental break with the current runtime
architecture, and it would make the currently very elegant and simple
model
much more complicated and harder to maintain. Given that Flink's
runtime is
complex enough, I would really like to avoid that.

The current runtime paradigm coordinates between operators strictly
via
in-band events. RPC calls happen between operators and the master for
triggering and acknowledging execution and checkpoints.

I was wondering whether we can keep following that paradigm and still
get
most of what you are proposing here. In some sense, all we need to
do is
replace RPC calls with in-band events, and "decentralize" the
coordinator
such that every operator can make its own termination decision by
itself.

This is only a rough sketch, you probably need to flesh it out more.

- I assume that the OP in the diagram knows that it is in a loop and
that
it is the one connected to the head and tail

- When OP receives and EndOfStream Event from the regular source
(RS),
it
emits an "AttemptTermination" event downstream to the operators
involved in
the loop. It attaches an attempt sequence number and memorizes that
- Tail and Head forward these events
- When OP receives the event back with the same attempt sequence
number,
and no records came in the meantime, it shuts down and emits
EndOfStream
downstream
- When other records came back between emitting the
AttemptTermination
event and receiving it back, then it emits a new AttemptTermination
event
with the next sequence number.
- This should terminate as soon as the loop is empty.

Might this model even generalize to nested loops, where the
"AttemptTermination" event is scoped by the loop's nesting level?

Let me know what you think!


Best,
Stephan


On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org
<mailto:
sewen@apache.org><ma...@apache.org>
<ma...@apache.org><mailto:
sewen@apache.org<ma...@apache.org>
<ma...@apache.org>>> wrote:

Hi!

I am still scanning it and compiling some comments. Give me a bit ;-)

Stephan


On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se
<mailto:
parisc@kth.se><ma...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>><mailto:
parisc@kth.se<ma...@kth.se> <mailto:
parisc@kth.se>>> wrote:

Hey all,

Now that many of you have already scanned the document (judging from
the
views) maybe it is time to give back some feedback!
Did you like it? Would you suggest an improvement?

I would suggest not to leave this in the void. It has to do with
important properties that the system promises to provide.
Me and Fouad will do our best to answer your questions and discuss
this
further.

cheers
Paris

On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
parisc@kth.se><ma...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>><mailto:
parisc@
kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
http://kth.se/
<mailto:parisc@k
th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
http://th.se<http://th.se/> <http://th.se/>>>> wrote:

Hello everyone,

Loops in Apache Flink have a good potential to become a much more
powerful thing in future version of Apache Flink.
There is generally high demand to make them usable and first of all
production-ready for upcoming releases.

As a first commitment we would like to propose FLIP-13 for consistent
processing with Loops.
We are also working on scoped loops for Q1 2017 which we can share if
there is enough interest.

For now, that is an improvement proposal that solves two pending
major
issues:

1) The (not so trivial) problem of correct termination of jobs with
iterations
2) The applicability of the checkpointing algorithm to iterative
dataflow
graphs.

We would really appreciate it if you go through the linked draft
(motivation and proposed changes) for FLIP-13 and point out comments,
preferably publicly in this devlist discussion before we go ahead and
update the wiki.

https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
BhDbtoYucmByBjRBISs/edit?usp=sharing

cheers

Paris and Fouad










Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Paris Carbone <pa...@kth.se>.
That was fast!  Seems to be working. 
Thank you Fabian! 

> On 17 Nov 2016, at 13:58, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Paris,
> 
> just gave you the permissions (I hope).
> Let me know if something does not work.
> 
> Cheers, Fabian
> 
> 2016-11-17 13:48 GMT+01:00 Paris Carbone <pa...@kth.se>:
> 
>> We do not have to schedule this for an early Flink release, just saying.
>> I would just like to get the changes out and you people can review it and
>> integrate it anytime at your own pace.
>> 
>> Who is the admin of the wiki? It would be nice to get write access.
>> 
>>> On 17 Nov 2016, at 13:45, Paris Carbone <pa...@kth.se> wrote:
>>> 
>>> Sounds like a plan!
>>> 
>>> Can someone grant me access to write in the wiki please?
>>> My username is “senorcarbone”.
>>> 
>>> Paris
>>> 
>>>> On 16 Nov 2016, at 14:30, Gyula Fóra <gy...@gmail.com> wrote:
>>>> 
>>>> I am not completely sure whether we should deprecate the old API for
>> 1.2 or
>>>> remove it completely. Personally I am in favor of removing it, I don't
>>>> think it is a huge burden to move to the new one if it makes for a much
>>>> nicer user experience.
>>>> 
>>>> I think you can go ahead add the FLIP to the wiki and open the PR so we
>> can
>>>> start the review if you have it ready anyways.
>>>> 
>>>> Gyula
>>>> 
>>>> Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 16., Sze,
>>>> 11:55):
>>>> 
>>>>> Thanks for reviewing, Gyula.
>>>>> 
>>>>> One thing that is still up to discussion is whether we should remove
>>>>> completely the old iterations API or simply mark it as deprecated till
>> v2.0.
>>>>> Also, not sure what is the best process now. We have the changes ready.
>>>>> Should I copy the FLIP to the wiki and trigger the PRs or wait for a
>> few
>>>>> more days in case someone has objections?
>>>>> 
>>>>> @Stephan, what is your take on our interpretation of the approach you
>>>>> suggested? Should we proceed or is there anything that you do not find
>> nice?
>>>>> 
>>>>> Paris
>>>>> 
>>>>>> On 15 Nov 2016, at 10:01, Gyula Fóra <gy...@apache.org> wrote:
>>>>>> 
>>>>>> Hi Paris,
>>>>>> 
>>>>>> I like the proposed changes to the iteration API, this cleans up
>> things
>>>>> in
>>>>>> the Java API without any strict restriction I think (it was never a
>>>>> problem
>>>>>> in the Scala API).
>>>>>> 
>>>>>> The termination algorithm based on the proposed scoped loops seems to
>> be
>>>>>> fairly simple and looks good :)
>>>>>> 
>>>>>> Cheers,
>>>>>> Gyula
>>>>>> 
>>>>>> Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 14., H,
>>>>> 8:50):
>>>>>> 
>>>>>>> That would be great Shi! Let's take that offline.
>>>>>>> 
>>>>>>> Anyone else interested in the iteration changes? It would be nice to
>>>>>>> incorporate these to v1.2 if possible so I count on your review asap.
>>>>>>> 
>>>>>>> cheers,
>>>>>>> Paris
>>>>>>> 
>>>>>>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <
>> xiaogang.sxg@alibaba-inc.com
>>>>>>> <ma...@alibaba-inc.com>> wrote:
>>>>>>> 
>>>>>>> Hi Paris
>>>>>>> 
>>>>>>> Unfortunately, the project is not public yet.
>>>>>>> But i can provide you a primitive implementation of the update
>> protocol
>>>>> in
>>>>>>> the paper. It’s implemented in Storm. Since the protocol assumes the
>>>>>>> communication channels between different tasks are dual, i think it’s
>>>>> not
>>>>>>> easy to adapt it to Flink.
>>>>>>> 
>>>>>>> Regards
>>>>>>> Xiaogang
>>>>>>> 
>>>>>>> 
>>>>>>> 在 2016年11月12日,上午3:03,Paris Carbone <parisc@kth.se<mailto:parisc@
>> kth.se
>>>>>>> 
>>>>>>> 写道:
>>>>>>> 
>>>>>>> Hi Shi,
>>>>>>> 
>>>>>>> Naiad/Timely Dataflow and other projects use global coordination
>> which
>>>>> is
>>>>>>> very convenient for asynchronous progress tracking in general but it
>> has
>>>>>>> some downsides in a production systems that count on in-flight
>>>>>>> transactional control mechanisms and rollback recovery guarantees.
>> This
>>>>> is
>>>>>>> why we generally prefer decentralized approaches (despite their our
>>>>>>> downsides).
>>>>>>> 
>>>>>>> Regarding synchronous/structured iterations, this is a bit off topic
>> and
>>>>>>> they are a bit of a different story as you already know.
>>>>>>> We maintain a graph streaming (gelly-streams) library on Flink that
>> you
>>>>>>> might find interesting [1]. Vasia, another Flink committer is also
>>>>> working
>>>>>>> on that among others.
>>>>>>> You can keep an eye on it since we are planning to use this project
>> as a
>>>>>>> showcase for a new way of doing structured and fixpoint iterations on
>>>>>>> streams in the future.
>>>>>>> 
>>>>>>> P.S. many thanks for sharing your publication, it was an interesting
>>>>> read.
>>>>>>> Do you happen to have your source code public? We could most
>> certainly
>>>>> use
>>>>>>> it in an benchmark soon.
>>>>>>> 
>>>>>>> [1] https://github.com/vasia/gelly-streaming
>>>>>>> 
>>>>>>> 
>>>>>>> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<
>> mailto:
>>>>>>> shixiaogangg@gmail.com><ma...@gmail.com>> wrote:
>>>>>>> 
>>>>>>> Hi, Fouad
>>>>>>> 
>>>>>>> Thank you for the explanation. Now the centralized method seems
>> correct
>>>>> to
>>>>>>> me.
>>>>>>> The passing of StatusUpdate events will lead to synchronous
>> iterations
>>>>> and
>>>>>>> we are using the information in each iterations to terminate the
>>>>>>> computation.
>>>>>>> 
>>>>>>> Actually, i prefer the centralized method because in many
>> applications,
>>>>> the
>>>>>>> convergence may depend on some global statistics.
>>>>>>> For example, a PageRank program may terminate the computation when
>> 99%
>>>>>>> vertices are converged.
>>>>>>> I think those learning programs which cannot reach the fixed-point
>>>>>>> (oscillating around the fixed-point) can benefit a lot from such
>>>>> features.
>>>>>>> The decentralized method makes it hard to support such convergence
>>>>>>> conditions.
>>>>>>> 
>>>>>>> 
>>>>>>> Another concern is that Flink cannot produce periodical results in
>> the
>>>>>>> iteration over infinite data streams.
>>>>>>> Take a concrete example. Given an edge stream constructing a graph,
>> the
>>>>>>> user may need the PageRank weight of each vertex in the graphs
>> formed at
>>>>>>> certain instants.
>>>>>>> Currently Flink does not provide any input or iteration information
>> to
>>>>>>> users, making users hard to implement such real-time iterative
>>>>>>> applications.
>>>>>>> Such features are supported in both Naiad and Tornado. I think Flink
>>>>> should
>>>>>>> support it as well.
>>>>>>> 
>>>>>>> What do you think?
>>>>>>> 
>>>>>>> Regards
>>>>>>> Xiaogang
>>>>>>> 
>>>>>>> 
>>>>>>> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<
>> mailto:
>>>>>>> fouad.alsayadi@gmail.com><ma...@gmail.com>>:
>>>>>>> 
>>>>>>> Hi Shi,
>>>>>>> 
>>>>>>> It seems that you are referring to the centralized algorithm which
>> is no
>>>>>>> longer the proposed version.
>>>>>>> In the decentralized version (check last doc) there is no master
>> node or
>>>>>>> global coordination involved.
>>>>>>> 
>>>>>>> Let us keep this discussion to the decentralized one if possible.
>>>>>>> 
>>>>>>> To answer your points on the previous approach, there is a catch in
>> your
>>>>>>> trace at t7. Here is what is happening :
>>>>>>> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
>>>>>>> runtime (see 2.1 in the steps).
>>>>>>> - RS and Heads will broadcast StatusUpdate  event and will not notify
>>>>> its
>>>>>>> status.
>>>>>>> - When StatusUpdate event gets back to the head it will notify its
>>>>>>> WORKING  status.
>>>>>>> 
>>>>>>> Hope that answers your concern.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Fouad
>>>>>>> 
>>>>>>> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com
>>>>> <mailto:
>>>>>>> shixiaogangg@gmail.com><ma...@gmail.com>>
>>>>>>> wrote:
>>>>>>> 
>>>>>>> Hi Paris
>>>>>>> 
>>>>>>> I have several concerns about the correctness of the termination
>>>>>>> protocol.
>>>>>>> I think the termination protocol put an end to the computation even
>> when
>>>>>>> the computation has not converged.
>>>>>>> 
>>>>>>> Suppose there exists a loop context constructed by a OP operator, a
>> Head
>>>>>>> operator and a Tail operator (illustrated in Figure 2 in the first
>>>>>>> draft).
>>>>>>> The stream only contains one record. OP will pass the record to its
>>>>>>> downstream operators 10 times. In other words, the loop should
>> iterate
>>>>> 10
>>>>>>> times.
>>>>>>> 
>>>>>>> If I understood the protocol correctly, the following event sequence
>> may
>>>>>>> happen in the computation:
>>>>>>> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream",
>>>>> the
>>>>>>> system enters into Speculative Phase.
>>>>>>> t2:  OP receives Record and emits it to TAIL.
>>>>>>> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
>>>>>>> state.
>>>>>>> t4. OP receives the UpdateStatus event from HEAD, and notifies with
>> an
>>>>>>> WORKING state.
>>>>>>> t5. TAIL receives Record and emits it to HEAD.
>>>>>>> t6. TAIL receives the UpdateStatus event from OP, and notifies with
>> an
>>>>>>> WORKING state.
>>>>>>> t7. The system starts a new attempt. HEAD receives the UpdateStatus
>>>>> event
>>>>>>> and notifies with an IDLE state.  (Record is still in transition.)
>>>>>>> t8. OP receives the UpdateStatus event from HEAD and notifies with an
>>>>>>> IDLE
>>>>>>> state.
>>>>>>> t9. TAIL receives the UpdateStatus event from OP and notifies with an
>>>>>>> IDLE
>>>>>>> state.
>>>>>>> t10. HEAD receives Record from TAIL and emits it to OP.
>>>>>>> t11. System puts an end to the computation.
>>>>>>> 
>>>>>>> Though the computation is expected to iterate 10 times, it ends
>> earlier.
>>>>>>> The cause is that the communication channels of MASTER=>HEAD and
>>>>>>> TAIL=>HEAD
>>>>>>> are not synchronized.
>>>>>>> 
>>>>>>> I think the protocol follows the idea of the Chandy-Lamport
>> algorithm to
>>>>>>> determine a global state.
>>>>>>> But the information of whether a node has processed any record to
>> since
>>>>>>> the
>>>>>>> last request is not STABLE.
>>>>>>> Hence i doubt the correctness of the protocol.
>>>>>>> 
>>>>>>> To determine the termination correctly, we need some information
>> that is
>>>>>>> stable.
>>>>>>> In timelyflow, Naiad collects the progress made in each iteration and
>>>>>>> terminates the loop when a little progress is made in an iteration
>>>>>>> (identified by the timestamp vector).
>>>>>>> The information is stable because the result of an iteration cannot
>> be
>>>>>>> changed by the execution of later iterations.
>>>>>>> 
>>>>>>> A similar method is also adopted in Tornado.
>>>>>>> You may see my paper for more details about the termination of loops:
>>>>>>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
>>>>>>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
>>>>>>> 
>>>>>>> Regards
>>>>>>> Xiaogang
>>>>>>> 
>>>>>>> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
>>>>>>> parisc@kth.se><ma...@kth.se> <mailto:
>>>>>>> parisc@kth.se<ma...@kth.se>>>:
>>>>>>> 
>>>>>>> Hi again Flink folks,
>>>>>>> 
>>>>>>> Here is our new proposal that addresses Job Termination - the loop
>> fault
>>>>>>> tolerance proposal will follow shortly.
>>>>>>> As Stephan hinted, we need operators to be aware of their scope
>> level.
>>>>>>> 
>>>>>>> Thus, it is time we make loops great again! :)
>>>>>>> 
>>>>>>> Part of this FLIP basically introduces a new functional,
>> compositional
>>>>>>> API
>>>>>>> for defining asynchronous loops for DataStreams.
>>>>>>> This is coupled with a decentralized algorithm for job termination
>> with
>>>>>>> loops - along the lines of what Stephan described.
>>>>>>> We are already working on the actual prototypes as you can observe in
>>>>>>> the
>>>>>>> links of the doc.
>>>>>>> 
>>>>>>> Please let us know if you like (or don't like) it and why, in this
>> mail
>>>>>>> discussion.
>>>>>>> 
>>>>>>> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
>>>>>>> PfTHtq3173EhsAkpBoQ
>>>>>>> 
>>>>>>> cheers
>>>>>>> Paris and Fouad
>>>>>>> 
>>>>>>> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
>>>>>>> parisc@kth.se> <mailto:
>>>>>>> parisc@kth.se<ma...@kth.se>><mailto:
>>>>> parisc@
>>>>>>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
>> http://kth.se/
>>>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>> Hey Stephan,
>>>>>>> 
>>>>>>> Thanks for looking into it!
>>>>>>> 
>>>>>>> +1 for breaking this up, will do that.
>>>>>>> 
>>>>>>> I can see your point and maybe it makes sense to introduce part of
>>>>>>> scoping
>>>>>>> to incorporate support for nested loops (otherwise it can’t work).
>>>>>>> Let us think about this a bit. We will share another draft for a more
>>>>>>> detail description of the approach you are suggesting asap.
>>>>>>> 
>>>>>>> 
>>>>>>> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
>>>>>>> sewen@apache.org><ma...@apache.org> <mailto:
>>>>>>> sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
>>>>>>>>> <mailto:sewen
>>>>>>> @apache.org<http://apache.org/>>> wrote:
>>>>>>> 
>>>>>>> How about we break this up into two FLIPs? There are after all two
>>>>>>> orthogonal problems (termination, fault tolerance) with quite
>> different
>>>>>>> discussion states.
>>>>>>> 
>>>>>>> Concerning fault tolerance, I like the ideas.
>>>>>>> For the termination proposal, I would like to iterate a bit more.
>>>>>>> 
>>>>>>> *Termination algorithm:*
>>>>>>> 
>>>>>>> My main concern here is the introduction of a termination coordinator
>>>>>>> and
>>>>>>> any involvement of RPC messages when deciding termination.
>>>>>>> That would be such a fundamental break with the current runtime
>>>>>>> architecture, and it would make the currently very elegant and simple
>>>>>>> model
>>>>>>> much more complicated and harder to maintain. Given that Flink's
>>>>>>> runtime is
>>>>>>> complex enough, I would really like to avoid that.
>>>>>>> 
>>>>>>> The current runtime paradigm coordinates between operators strictly
>> via
>>>>>>> in-band events. RPC calls happen between operators and the master for
>>>>>>> triggering and acknowledging execution and checkpoints.
>>>>>>> 
>>>>>>> I was wondering whether we can keep following that paradigm and still
>>>>>>> get
>>>>>>> most of what you are proposing here. In some sense, all we need to
>> do is
>>>>>>> replace RPC calls with in-band events, and "decentralize" the
>>>>>>> coordinator
>>>>>>> such that every operator can make its own termination decision by
>>>>>>> itself.
>>>>>>> 
>>>>>>> This is only a rough sketch, you probably need to flesh it out more.
>>>>>>> 
>>>>>>> - I assume that the OP in the diagram knows that it is in a loop and
>>>>>>> that
>>>>>>> it is the one connected to the head and tail
>>>>>>> 
>>>>>>> - When OP receives and EndOfStream Event from the regular source
>> (RS),
>>>>>>> it
>>>>>>> emits an "AttemptTermination" event downstream to the operators
>>>>>>> involved in
>>>>>>> the loop. It attaches an attempt sequence number and memorizes that
>>>>>>> - Tail and Head forward these events
>>>>>>> - When OP receives the event back with the same attempt sequence
>> number,
>>>>>>> and no records came in the meantime, it shuts down and emits
>> EndOfStream
>>>>>>> downstream
>>>>>>> - When other records came back between emitting the
>> AttemptTermination
>>>>>>> event and receiving it back, then it emits a new AttemptTermination
>>>>>>> event
>>>>>>> with the next sequence number.
>>>>>>> - This should terminate as soon as the loop is empty.
>>>>>>> 
>>>>>>> Might this model even generalize to nested loops, where the
>>>>>>> "AttemptTermination" event is scoped by the loop's nesting level?
>>>>>>> 
>>>>>>> Let me know what you think!
>>>>>>> 
>>>>>>> 
>>>>>>> Best,
>>>>>>> Stephan
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org
>>>>> <mailto:
>>>>>>> sewen@apache.org><ma...@apache.org>
>>>>>>> <ma...@apache.org><mailto:
>>>>>>> sewen@apache.org<ma...@apache.org>
>>>>>>> <ma...@apache.org>>> wrote:
>>>>>>> 
>>>>>>> Hi!
>>>>>>> 
>>>>>>> I am still scanning it and compiling some comments. Give me a bit ;-)
>>>>>>> 
>>>>>>> Stephan
>>>>>>> 
>>>>>>> 
>>>>>>> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se
>> <mailto:
>>>>>>> parisc@kth.se><ma...@kth.se> <mailto:
>>>>>>> parisc@kth.se<ma...@kth.se>><mailto:
>>>>>>> parisc@kth.se<ma...@kth.se> <mailto:
>>>>>>> parisc@kth.se>>> wrote:
>>>>>>> 
>>>>>>> Hey all,
>>>>>>> 
>>>>>>> Now that many of you have already scanned the document (judging from
>> the
>>>>>>> views) maybe it is time to give back some feedback!
>>>>>>> Did you like it? Would you suggest an improvement?
>>>>>>> 
>>>>>>> I would suggest not to leave this in the void. It has to do with
>>>>>>> important properties that the system promises to provide.
>>>>>>> Me and Fouad will do our best to answer your questions and discuss
>> this
>>>>>>> further.
>>>>>>> 
>>>>>>> cheers
>>>>>>> Paris
>>>>>>> 
>>>>>>> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
>>>>>>> parisc@kth.se><ma...@kth.se> <mailto:
>>>>>>> parisc@kth.se<ma...@kth.se>><mailto:
>>>>> parisc@
>>>>>>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
>> http://kth.se/
>>>>>>>>> <mailto:parisc@k
>>>>>>> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
>>>>>>> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
>>>>>>> 
>>>>>>> Hello everyone,
>>>>>>> 
>>>>>>> Loops in Apache Flink have a good potential to become a much more
>>>>>>> powerful thing in future version of Apache Flink.
>>>>>>> There is generally high demand to make them usable and first of all
>>>>>>> production-ready for upcoming releases.
>>>>>>> 
>>>>>>> As a first commitment we would like to propose FLIP-13 for consistent
>>>>>>> processing with Loops.
>>>>>>> We are also working on scoped loops for Q1 2017 which we can share if
>>>>>>> there is enough interest.
>>>>>>> 
>>>>>>> For now, that is an improvement proposal that solves two pending
>> major
>>>>>>> issues:
>>>>>>> 
>>>>>>> 1) The (not so trivial) problem of correct termination of jobs with
>>>>>>> iterations
>>>>>>> 2) The applicability of the checkpointing algorithm to iterative
>>>>>>> dataflow
>>>>>>> graphs.
>>>>>>> 
>>>>>>> We would really appreciate it if you go through the linked draft
>>>>>>> (motivation and proposed changes) for FLIP-13 and point out comments,
>>>>>>> preferably publicly in this devlist discussion before we go ahead and
>>>>>>> update the wiki.
>>>>>>> 
>>>>>>> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
>>>>>>> BhDbtoYucmByBjRBISs/edit?usp=sharing
>>>>>>> 
>>>>>>> cheers
>>>>>>> 
>>>>>>> Paris and Fouad
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>> 
>> 


Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Paris,

just gave you the permissions (I hope).
Let me know if something does not work.

Cheers, Fabian

2016-11-17 13:48 GMT+01:00 Paris Carbone <pa...@kth.se>:

> We do not have to schedule this for an early Flink release, just saying.
> I would just like to get the changes out and you people can review it and
> integrate it anytime at your own pace.
>
> Who is the admin of the wiki? It would be nice to get write access.
>
> > On 17 Nov 2016, at 13:45, Paris Carbone <pa...@kth.se> wrote:
> >
> > Sounds like a plan!
> >
> > Can someone grant me access to write in the wiki please?
> > My username is “senorcarbone”.
> >
> > Paris
> >
> >> On 16 Nov 2016, at 14:30, Gyula Fóra <gy...@gmail.com> wrote:
> >>
> >> I am not completely sure whether we should deprecate the old API for
> 1.2 or
> >> remove it completely. Personally I am in favor of removing it, I don't
> >> think it is a huge burden to move to the new one if it makes for a much
> >> nicer user experience.
> >>
> >> I think you can go ahead add the FLIP to the wiki and open the PR so we
> can
> >> start the review if you have it ready anyways.
> >>
> >> Gyula
> >>
> >> Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 16., Sze,
> >> 11:55):
> >>
> >>> Thanks for reviewing, Gyula.
> >>>
> >>> One thing that is still up to discussion is whether we should remove
> >>> completely the old iterations API or simply mark it as deprecated till
> v2.0.
> >>> Also, not sure what is the best process now. We have the changes ready.
> >>> Should I copy the FLIP to the wiki and trigger the PRs or wait for a
> few
> >>> more days in case someone has objections?
> >>>
> >>> @Stephan, what is your take on our interpretation of the approach you
> >>> suggested? Should we proceed or is there anything that you do not find
> nice?
> >>>
> >>> Paris
> >>>
> >>>> On 15 Nov 2016, at 10:01, Gyula Fóra <gy...@apache.org> wrote:
> >>>>
> >>>> Hi Paris,
> >>>>
> >>>> I like the proposed changes to the iteration API, this cleans up
> things
> >>> in
> >>>> the Java API without any strict restriction I think (it was never a
> >>> problem
> >>>> in the Scala API).
> >>>>
> >>>> The termination algorithm based on the proposed scoped loops seems to
> be
> >>>> fairly simple and looks good :)
> >>>>
> >>>> Cheers,
> >>>> Gyula
> >>>>
> >>>> Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 14., H,
> >>> 8:50):
> >>>>
> >>>>> That would be great Shi! Let's take that offline.
> >>>>>
> >>>>> Anyone else interested in the iteration changes? It would be nice to
> >>>>> incorporate these to v1.2 if possible so I count on your review asap.
> >>>>>
> >>>>> cheers,
> >>>>> Paris
> >>>>>
> >>>>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <
> xiaogang.sxg@alibaba-inc.com
> >>>>> <ma...@alibaba-inc.com>> wrote:
> >>>>>
> >>>>> Hi Paris
> >>>>>
> >>>>> Unfortunately, the project is not public yet.
> >>>>> But i can provide you a primitive implementation of the update
> protocol
> >>> in
> >>>>> the paper. It’s implemented in Storm. Since the protocol assumes the
> >>>>> communication channels between different tasks are dual, i think it’s
> >>> not
> >>>>> easy to adapt it to Flink.
> >>>>>
> >>>>> Regards
> >>>>> Xiaogang
> >>>>>
> >>>>>
> >>>>> 在 2016年11月12日,上午3:03,Paris Carbone <parisc@kth.se<mailto:parisc@
> kth.se
> >>>>>
> >>>>> 写道:
> >>>>>
> >>>>> Hi Shi,
> >>>>>
> >>>>> Naiad/Timely Dataflow and other projects use global coordination
> which
> >>> is
> >>>>> very convenient for asynchronous progress tracking in general but it
> has
> >>>>> some downsides in a production systems that count on in-flight
> >>>>> transactional control mechanisms and rollback recovery guarantees.
> This
> >>> is
> >>>>> why we generally prefer decentralized approaches (despite their our
> >>>>> downsides).
> >>>>>
> >>>>> Regarding synchronous/structured iterations, this is a bit off topic
> and
> >>>>> they are a bit of a different story as you already know.
> >>>>> We maintain a graph streaming (gelly-streams) library on Flink that
> you
> >>>>> might find interesting [1]. Vasia, another Flink committer is also
> >>> working
> >>>>> on that among others.
> >>>>> You can keep an eye on it since we are planning to use this project
> as a
> >>>>> showcase for a new way of doing structured and fixpoint iterations on
> >>>>> streams in the future.
> >>>>>
> >>>>> P.S. many thanks for sharing your publication, it was an interesting
> >>> read.
> >>>>> Do you happen to have your source code public? We could most
> certainly
> >>> use
> >>>>> it in an benchmark soon.
> >>>>>
> >>>>> [1] https://github.com/vasia/gelly-streaming
> >>>>>
> >>>>>
> >>>>> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<
> mailto:
> >>>>> shixiaogangg@gmail.com><ma...@gmail.com>> wrote:
> >>>>>
> >>>>> Hi, Fouad
> >>>>>
> >>>>> Thank you for the explanation. Now the centralized method seems
> correct
> >>> to
> >>>>> me.
> >>>>> The passing of StatusUpdate events will lead to synchronous
> iterations
> >>> and
> >>>>> we are using the information in each iterations to terminate the
> >>>>> computation.
> >>>>>
> >>>>> Actually, i prefer the centralized method because in many
> applications,
> >>> the
> >>>>> convergence may depend on some global statistics.
> >>>>> For example, a PageRank program may terminate the computation when
> 99%
> >>>>> vertices are converged.
> >>>>> I think those learning programs which cannot reach the fixed-point
> >>>>> (oscillating around the fixed-point) can benefit a lot from such
> >>> features.
> >>>>> The decentralized method makes it hard to support such convergence
> >>>>> conditions.
> >>>>>
> >>>>>
> >>>>> Another concern is that Flink cannot produce periodical results in
> the
> >>>>> iteration over infinite data streams.
> >>>>> Take a concrete example. Given an edge stream constructing a graph,
> the
> >>>>> user may need the PageRank weight of each vertex in the graphs
> formed at
> >>>>> certain instants.
> >>>>> Currently Flink does not provide any input or iteration information
> to
> >>>>> users, making users hard to implement such real-time iterative
> >>>>> applications.
> >>>>> Such features are supported in both Naiad and Tornado. I think Flink
> >>> should
> >>>>> support it as well.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Regards
> >>>>> Xiaogang
> >>>>>
> >>>>>
> >>>>> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<
> mailto:
> >>>>> fouad.alsayadi@gmail.com><ma...@gmail.com>>:
> >>>>>
> >>>>> Hi Shi,
> >>>>>
> >>>>> It seems that you are referring to the centralized algorithm which
> is no
> >>>>> longer the proposed version.
> >>>>> In the decentralized version (check last doc) there is no master
> node or
> >>>>> global coordination involved.
> >>>>>
> >>>>> Let us keep this discussion to the decentralized one if possible.
> >>>>>
> >>>>> To answer your points on the previous approach, there is a catch in
> your
> >>>>> trace at t7. Here is what is happening :
> >>>>> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
> >>>>> runtime (see 2.1 in the steps).
> >>>>> - RS and Heads will broadcast StatusUpdate  event and will not notify
> >>> its
> >>>>> status.
> >>>>> - When StatusUpdate event gets back to the head it will notify its
> >>>>> WORKING  status.
> >>>>>
> >>>>> Hope that answers your concern.
> >>>>>
> >>>>> Best,
> >>>>> Fouad
> >>>>>
> >>>>> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com
> >>> <mailto:
> >>>>> shixiaogangg@gmail.com><ma...@gmail.com>>
> >>>>> wrote:
> >>>>>
> >>>>> Hi Paris
> >>>>>
> >>>>> I have several concerns about the correctness of the termination
> >>>>> protocol.
> >>>>> I think the termination protocol put an end to the computation even
> when
> >>>>> the computation has not converged.
> >>>>>
> >>>>> Suppose there exists a loop context constructed by a OP operator, a
> Head
> >>>>> operator and a Tail operator (illustrated in Figure 2 in the first
> >>>>> draft).
> >>>>> The stream only contains one record. OP will pass the record to its
> >>>>> downstream operators 10 times. In other words, the loop should
> iterate
> >>> 10
> >>>>> times.
> >>>>>
> >>>>> If I understood the protocol correctly, the following event sequence
> may
> >>>>> happen in the computation:
> >>>>> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream",
> >>> the
> >>>>> system enters into Speculative Phase.
> >>>>> t2:  OP receives Record and emits it to TAIL.
> >>>>> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
> >>>>> state.
> >>>>> t4. OP receives the UpdateStatus event from HEAD, and notifies with
> an
> >>>>> WORKING state.
> >>>>> t5. TAIL receives Record and emits it to HEAD.
> >>>>> t6. TAIL receives the UpdateStatus event from OP, and notifies with
> an
> >>>>> WORKING state.
> >>>>> t7. The system starts a new attempt. HEAD receives the UpdateStatus
> >>> event
> >>>>> and notifies with an IDLE state.  (Record is still in transition.)
> >>>>> t8. OP receives the UpdateStatus event from HEAD and notifies with an
> >>>>> IDLE
> >>>>> state.
> >>>>> t9. TAIL receives the UpdateStatus event from OP and notifies with an
> >>>>> IDLE
> >>>>> state.
> >>>>> t10. HEAD receives Record from TAIL and emits it to OP.
> >>>>> t11. System puts an end to the computation.
> >>>>>
> >>>>> Though the computation is expected to iterate 10 times, it ends
> earlier.
> >>>>> The cause is that the communication channels of MASTER=>HEAD and
> >>>>> TAIL=>HEAD
> >>>>> are not synchronized.
> >>>>>
> >>>>> I think the protocol follows the idea of the Chandy-Lamport
> algorithm to
> >>>>> determine a global state.
> >>>>> But the information of whether a node has processed any record to
> since
> >>>>> the
> >>>>> last request is not STABLE.
> >>>>> Hence i doubt the correctness of the protocol.
> >>>>>
> >>>>> To determine the termination correctly, we need some information
> that is
> >>>>> stable.
> >>>>> In timelyflow, Naiad collects the progress made in each iteration and
> >>>>> terminates the loop when a little progress is made in an iteration
> >>>>> (identified by the timestamp vector).
> >>>>> The information is stable because the result of an iteration cannot
> be
> >>>>> changed by the execution of later iterations.
> >>>>>
> >>>>> A similar method is also adopted in Tornado.
> >>>>> You may see my paper for more details about the termination of loops:
> >>>>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
> >>>>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
> >>>>>
> >>>>> Regards
> >>>>> Xiaogang
> >>>>>
> >>>>> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
> >>>>> parisc@kth.se><ma...@kth.se> <mailto:
> >>>>> parisc@kth.se<ma...@kth.se>>>:
> >>>>>
> >>>>> Hi again Flink folks,
> >>>>>
> >>>>> Here is our new proposal that addresses Job Termination - the loop
> fault
> >>>>> tolerance proposal will follow shortly.
> >>>>> As Stephan hinted, we need operators to be aware of their scope
> level.
> >>>>>
> >>>>> Thus, it is time we make loops great again! :)
> >>>>>
> >>>>> Part of this FLIP basically introduces a new functional,
> compositional
> >>>>> API
> >>>>> for defining asynchronous loops for DataStreams.
> >>>>> This is coupled with a decentralized algorithm for job termination
> with
> >>>>> loops - along the lines of what Stephan described.
> >>>>> We are already working on the actual prototypes as you can observe in
> >>>>> the
> >>>>> links of the doc.
> >>>>>
> >>>>> Please let us know if you like (or don't like) it and why, in this
> mail
> >>>>> discussion.
> >>>>>
> >>>>> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
> >>>>> PfTHtq3173EhsAkpBoQ
> >>>>>
> >>>>> cheers
> >>>>> Paris and Fouad
> >>>>>
> >>>>> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
> >>>>> parisc@kth.se> <mailto:
> >>>>> parisc@kth.se<ma...@kth.se>><mailto:
> >>> parisc@
> >>>>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
> http://kth.se/
> >>>>>>
> >>>>> wrote:
> >>>>>
> >>>>> Hey Stephan,
> >>>>>
> >>>>> Thanks for looking into it!
> >>>>>
> >>>>> +1 for breaking this up, will do that.
> >>>>>
> >>>>> I can see your point and maybe it makes sense to introduce part of
> >>>>> scoping
> >>>>> to incorporate support for nested loops (otherwise it can’t work).
> >>>>> Let us think about this a bit. We will share another draft for a more
> >>>>> detail description of the approach you are suggesting asap.
> >>>>>
> >>>>>
> >>>>> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
> >>>>> sewen@apache.org><ma...@apache.org> <mailto:
> >>>>> sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
> >>>>>>> <mailto:sewen
> >>>>> @apache.org<http://apache.org/>>> wrote:
> >>>>>
> >>>>> How about we break this up into two FLIPs? There are after all two
> >>>>> orthogonal problems (termination, fault tolerance) with quite
> different
> >>>>> discussion states.
> >>>>>
> >>>>> Concerning fault tolerance, I like the ideas.
> >>>>> For the termination proposal, I would like to iterate a bit more.
> >>>>>
> >>>>> *Termination algorithm:*
> >>>>>
> >>>>> My main concern here is the introduction of a termination coordinator
> >>>>> and
> >>>>> any involvement of RPC messages when deciding termination.
> >>>>> That would be such a fundamental break with the current runtime
> >>>>> architecture, and it would make the currently very elegant and simple
> >>>>> model
> >>>>> much more complicated and harder to maintain. Given that Flink's
> >>>>> runtime is
> >>>>> complex enough, I would really like to avoid that.
> >>>>>
> >>>>> The current runtime paradigm coordinates between operators strictly
> via
> >>>>> in-band events. RPC calls happen between operators and the master for
> >>>>> triggering and acknowledging execution and checkpoints.
> >>>>>
> >>>>> I was wondering whether we can keep following that paradigm and still
> >>>>> get
> >>>>> most of what you are proposing here. In some sense, all we need to
> do is
> >>>>> replace RPC calls with in-band events, and "decentralize" the
> >>>>> coordinator
> >>>>> such that every operator can make its own termination decision by
> >>>>> itself.
> >>>>>
> >>>>> This is only a rough sketch, you probably need to flesh it out more.
> >>>>>
> >>>>> - I assume that the OP in the diagram knows that it is in a loop and
> >>>>> that
> >>>>> it is the one connected to the head and tail
> >>>>>
> >>>>> - When OP receives and EndOfStream Event from the regular source
> (RS),
> >>>>> it
> >>>>> emits an "AttemptTermination" event downstream to the operators
> >>>>> involved in
> >>>>> the loop. It attaches an attempt sequence number and memorizes that
> >>>>> - Tail and Head forward these events
> >>>>> - When OP receives the event back with the same attempt sequence
> number,
> >>>>> and no records came in the meantime, it shuts down and emits
> EndOfStream
> >>>>> downstream
> >>>>> - When other records came back between emitting the
> AttemptTermination
> >>>>> event and receiving it back, then it emits a new AttemptTermination
> >>>>> event
> >>>>> with the next sequence number.
> >>>>> - This should terminate as soon as the loop is empty.
> >>>>>
> >>>>> Might this model even generalize to nested loops, where the
> >>>>> "AttemptTermination" event is scoped by the loop's nesting level?
> >>>>>
> >>>>> Let me know what you think!
> >>>>>
> >>>>>
> >>>>> Best,
> >>>>> Stephan
> >>>>>
> >>>>>
> >>>>> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org
> >>> <mailto:
> >>>>> sewen@apache.org><ma...@apache.org>
> >>>>> <ma...@apache.org><mailto:
> >>>>> sewen@apache.org<ma...@apache.org>
> >>>>> <ma...@apache.org>>> wrote:
> >>>>>
> >>>>> Hi!
> >>>>>
> >>>>> I am still scanning it and compiling some comments. Give me a bit ;-)
> >>>>>
> >>>>> Stephan
> >>>>>
> >>>>>
> >>>>> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se
> <mailto:
> >>>>> parisc@kth.se><ma...@kth.se> <mailto:
> >>>>> parisc@kth.se<ma...@kth.se>><mailto:
> >>>>> parisc@kth.se<ma...@kth.se> <mailto:
> >>>>> parisc@kth.se>>> wrote:
> >>>>>
> >>>>> Hey all,
> >>>>>
> >>>>> Now that many of you have already scanned the document (judging from
> the
> >>>>> views) maybe it is time to give back some feedback!
> >>>>> Did you like it? Would you suggest an improvement?
> >>>>>
> >>>>> I would suggest not to leave this in the void. It has to do with
> >>>>> important properties that the system promises to provide.
> >>>>> Me and Fouad will do our best to answer your questions and discuss
> this
> >>>>> further.
> >>>>>
> >>>>> cheers
> >>>>> Paris
> >>>>>
> >>>>> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
> >>>>> parisc@kth.se><ma...@kth.se> <mailto:
> >>>>> parisc@kth.se<ma...@kth.se>><mailto:
> >>> parisc@
> >>>>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <
> http://kth.se/
> >>>>>>> <mailto:parisc@k
> >>>>> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
> >>>>> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
> >>>>>
> >>>>> Hello everyone,
> >>>>>
> >>>>> Loops in Apache Flink have a good potential to become a much more
> >>>>> powerful thing in future version of Apache Flink.
> >>>>> There is generally high demand to make them usable and first of all
> >>>>> production-ready for upcoming releases.
> >>>>>
> >>>>> As a first commitment we would like to propose FLIP-13 for consistent
> >>>>> processing with Loops.
> >>>>> We are also working on scoped loops for Q1 2017 which we can share if
> >>>>> there is enough interest.
> >>>>>
> >>>>> For now, that is an improvement proposal that solves two pending
> major
> >>>>> issues:
> >>>>>
> >>>>> 1) The (not so trivial) problem of correct termination of jobs with
> >>>>> iterations
> >>>>> 2) The applicability of the checkpointing algorithm to iterative
> >>>>> dataflow
> >>>>> graphs.
> >>>>>
> >>>>> We would really appreciate it if you go through the linked draft
> >>>>> (motivation and proposed changes) for FLIP-13 and point out comments,
> >>>>> preferably publicly in this devlist discussion before we go ahead and
> >>>>> update the wiki.
> >>>>>
> >>>>> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
> >>>>> BhDbtoYucmByBjRBISs/edit?usp=sharing
> >>>>>
> >>>>> cheers
> >>>>>
> >>>>> Paris and Fouad
> >>>>>
> >>>>>
> >>>
> >>>
> >
>
>

Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Paris Carbone <pa...@kth.se>.
We do not have to schedule this for an early Flink release, just saying.
I would just like to get the changes out and you people can review it and integrate it anytime at your own pace.

Who is the admin of the wiki? It would be nice to get write access.

> On 17 Nov 2016, at 13:45, Paris Carbone <pa...@kth.se> wrote:
> 
> Sounds like a plan!
> 
> Can someone grant me access to write in the wiki please?
> My username is “senorcarbone”.
> 
> Paris
> 
>> On 16 Nov 2016, at 14:30, Gyula Fóra <gy...@gmail.com> wrote:
>> 
>> I am not completely sure whether we should deprecate the old API for 1.2 or
>> remove it completely. Personally I am in favor of removing it, I don't
>> think it is a huge burden to move to the new one if it makes for a much
>> nicer user experience.
>> 
>> I think you can go ahead add the FLIP to the wiki and open the PR so we can
>> start the review if you have it ready anyways.
>> 
>> Gyula
>> 
>> Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 16., Sze,
>> 11:55):
>> 
>>> Thanks for reviewing, Gyula.
>>> 
>>> One thing that is still up to discussion is whether we should remove
>>> completely the old iterations API or simply mark it as deprecated till v2.0.
>>> Also, not sure what is the best process now. We have the changes ready.
>>> Should I copy the FLIP to the wiki and trigger the PRs or wait for a few
>>> more days in case someone has objections?
>>> 
>>> @Stephan, what is your take on our interpretation of the approach you
>>> suggested? Should we proceed or is there anything that you do not find nice?
>>> 
>>> Paris
>>> 
>>>> On 15 Nov 2016, at 10:01, Gyula Fóra <gy...@apache.org> wrote:
>>>> 
>>>> Hi Paris,
>>>> 
>>>> I like the proposed changes to the iteration API, this cleans up things
>>> in
>>>> the Java API without any strict restriction I think (it was never a
>>> problem
>>>> in the Scala API).
>>>> 
>>>> The termination algorithm based on the proposed scoped loops seems to be
>>>> fairly simple and looks good :)
>>>> 
>>>> Cheers,
>>>> Gyula
>>>> 
>>>> Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 14., H,
>>> 8:50):
>>>> 
>>>>> That would be great Shi! Let's take that offline.
>>>>> 
>>>>> Anyone else interested in the iteration changes? It would be nice to
>>>>> incorporate these to v1.2 if possible so I count on your review asap.
>>>>> 
>>>>> cheers,
>>>>> Paris
>>>>> 
>>>>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang.sxg@alibaba-inc.com
>>>>> <ma...@alibaba-inc.com>> wrote:
>>>>> 
>>>>> Hi Paris
>>>>> 
>>>>> Unfortunately, the project is not public yet.
>>>>> But i can provide you a primitive implementation of the update protocol
>>> in
>>>>> the paper. It’s implemented in Storm. Since the protocol assumes the
>>>>> communication channels between different tasks are dual, i think it’s
>>> not
>>>>> easy to adapt it to Flink.
>>>>> 
>>>>> Regards
>>>>> Xiaogang
>>>>> 
>>>>> 
>>>>> 在 2016年11月12日,上午3:03,Paris Carbone <parisc@kth.se<mailto:parisc@kth.se
>>>>> 
>>>>> 写道:
>>>>> 
>>>>> Hi Shi,
>>>>> 
>>>>> Naiad/Timely Dataflow and other projects use global coordination which
>>> is
>>>>> very convenient for asynchronous progress tracking in general but it has
>>>>> some downsides in a production systems that count on in-flight
>>>>> transactional control mechanisms and rollback recovery guarantees. This
>>> is
>>>>> why we generally prefer decentralized approaches (despite their our
>>>>> downsides).
>>>>> 
>>>>> Regarding synchronous/structured iterations, this is a bit off topic and
>>>>> they are a bit of a different story as you already know.
>>>>> We maintain a graph streaming (gelly-streams) library on Flink that you
>>>>> might find interesting [1]. Vasia, another Flink committer is also
>>> working
>>>>> on that among others.
>>>>> You can keep an eye on it since we are planning to use this project as a
>>>>> showcase for a new way of doing structured and fixpoint iterations on
>>>>> streams in the future.
>>>>> 
>>>>> P.S. many thanks for sharing your publication, it was an interesting
>>> read.
>>>>> Do you happen to have your source code public? We could most certainly
>>> use
>>>>> it in an benchmark soon.
>>>>> 
>>>>> [1] https://github.com/vasia/gelly-streaming
>>>>> 
>>>>> 
>>>>> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<mailto:
>>>>> shixiaogangg@gmail.com><ma...@gmail.com>> wrote:
>>>>> 
>>>>> Hi, Fouad
>>>>> 
>>>>> Thank you for the explanation. Now the centralized method seems correct
>>> to
>>>>> me.
>>>>> The passing of StatusUpdate events will lead to synchronous iterations
>>> and
>>>>> we are using the information in each iterations to terminate the
>>>>> computation.
>>>>> 
>>>>> Actually, i prefer the centralized method because in many applications,
>>> the
>>>>> convergence may depend on some global statistics.
>>>>> For example, a PageRank program may terminate the computation when 99%
>>>>> vertices are converged.
>>>>> I think those learning programs which cannot reach the fixed-point
>>>>> (oscillating around the fixed-point) can benefit a lot from such
>>> features.
>>>>> The decentralized method makes it hard to support such convergence
>>>>> conditions.
>>>>> 
>>>>> 
>>>>> Another concern is that Flink cannot produce periodical results in the
>>>>> iteration over infinite data streams.
>>>>> Take a concrete example. Given an edge stream constructing a graph, the
>>>>> user may need the PageRank weight of each vertex in the graphs formed at
>>>>> certain instants.
>>>>> Currently Flink does not provide any input or iteration information to
>>>>> users, making users hard to implement such real-time iterative
>>>>> applications.
>>>>> Such features are supported in both Naiad and Tornado. I think Flink
>>> should
>>>>> support it as well.
>>>>> 
>>>>> What do you think?
>>>>> 
>>>>> Regards
>>>>> Xiaogang
>>>>> 
>>>>> 
>>>>> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<mailto:
>>>>> fouad.alsayadi@gmail.com><ma...@gmail.com>>:
>>>>> 
>>>>> Hi Shi,
>>>>> 
>>>>> It seems that you are referring to the centralized algorithm which is no
>>>>> longer the proposed version.
>>>>> In the decentralized version (check last doc) there is no master node or
>>>>> global coordination involved.
>>>>> 
>>>>> Let us keep this discussion to the decentralized one if possible.
>>>>> 
>>>>> To answer your points on the previous approach, there is a catch in your
>>>>> trace at t7. Here is what is happening :
>>>>> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
>>>>> runtime (see 2.1 in the steps).
>>>>> - RS and Heads will broadcast StatusUpdate  event and will not notify
>>> its
>>>>> status.
>>>>> - When StatusUpdate event gets back to the head it will notify its
>>>>> WORKING  status.
>>>>> 
>>>>> Hope that answers your concern.
>>>>> 
>>>>> Best,
>>>>> Fouad
>>>>> 
>>>>> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com
>>> <mailto:
>>>>> shixiaogangg@gmail.com><ma...@gmail.com>>
>>>>> wrote:
>>>>> 
>>>>> Hi Paris
>>>>> 
>>>>> I have several concerns about the correctness of the termination
>>>>> protocol.
>>>>> I think the termination protocol put an end to the computation even when
>>>>> the computation has not converged.
>>>>> 
>>>>> Suppose there exists a loop context constructed by a OP operator, a Head
>>>>> operator and a Tail operator (illustrated in Figure 2 in the first
>>>>> draft).
>>>>> The stream only contains one record. OP will pass the record to its
>>>>> downstream operators 10 times. In other words, the loop should iterate
>>> 10
>>>>> times.
>>>>> 
>>>>> If I understood the protocol correctly, the following event sequence may
>>>>> happen in the computation:
>>>>> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream",
>>> the
>>>>> system enters into Speculative Phase.
>>>>> t2:  OP receives Record and emits it to TAIL.
>>>>> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
>>>>> state.
>>>>> t4. OP receives the UpdateStatus event from HEAD, and notifies with an
>>>>> WORKING state.
>>>>> t5. TAIL receives Record and emits it to HEAD.
>>>>> t6. TAIL receives the UpdateStatus event from OP, and notifies with an
>>>>> WORKING state.
>>>>> t7. The system starts a new attempt. HEAD receives the UpdateStatus
>>> event
>>>>> and notifies with an IDLE state.  (Record is still in transition.)
>>>>> t8. OP receives the UpdateStatus event from HEAD and notifies with an
>>>>> IDLE
>>>>> state.
>>>>> t9. TAIL receives the UpdateStatus event from OP and notifies with an
>>>>> IDLE
>>>>> state.
>>>>> t10. HEAD receives Record from TAIL and emits it to OP.
>>>>> t11. System puts an end to the computation.
>>>>> 
>>>>> Though the computation is expected to iterate 10 times, it ends earlier.
>>>>> The cause is that the communication channels of MASTER=>HEAD and
>>>>> TAIL=>HEAD
>>>>> are not synchronized.
>>>>> 
>>>>> I think the protocol follows the idea of the Chandy-Lamport algorithm to
>>>>> determine a global state.
>>>>> But the information of whether a node has processed any record to since
>>>>> the
>>>>> last request is not STABLE.
>>>>> Hence i doubt the correctness of the protocol.
>>>>> 
>>>>> To determine the termination correctly, we need some information that is
>>>>> stable.
>>>>> In timelyflow, Naiad collects the progress made in each iteration and
>>>>> terminates the loop when a little progress is made in an iteration
>>>>> (identified by the timestamp vector).
>>>>> The information is stable because the result of an iteration cannot be
>>>>> changed by the execution of later iterations.
>>>>> 
>>>>> A similar method is also adopted in Tornado.
>>>>> You may see my paper for more details about the termination of loops:
>>>>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
>>>>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
>>>>> 
>>>>> Regards
>>>>> Xiaogang
>>>>> 
>>>>> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
>>>>> parisc@kth.se><ma...@kth.se> <mailto:
>>>>> parisc@kth.se<ma...@kth.se>>>:
>>>>> 
>>>>> Hi again Flink folks,
>>>>> 
>>>>> Here is our new proposal that addresses Job Termination - the loop fault
>>>>> tolerance proposal will follow shortly.
>>>>> As Stephan hinted, we need operators to be aware of their scope level.
>>>>> 
>>>>> Thus, it is time we make loops great again! :)
>>>>> 
>>>>> Part of this FLIP basically introduces a new functional, compositional
>>>>> API
>>>>> for defining asynchronous loops for DataStreams.
>>>>> This is coupled with a decentralized algorithm for job termination with
>>>>> loops - along the lines of what Stephan described.
>>>>> We are already working on the actual prototypes as you can observe in
>>>>> the
>>>>> links of the doc.
>>>>> 
>>>>> Please let us know if you like (or don't like) it and why, in this mail
>>>>> discussion.
>>>>> 
>>>>> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
>>>>> PfTHtq3173EhsAkpBoQ
>>>>> 
>>>>> cheers
>>>>> Paris and Fouad
>>>>> 
>>>>> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
>>>>> parisc@kth.se> <mailto:
>>>>> parisc@kth.se<ma...@kth.se>><mailto:
>>> parisc@
>>>>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/
>>>>>> 
>>>>> wrote:
>>>>> 
>>>>> Hey Stephan,
>>>>> 
>>>>> Thanks for looking into it!
>>>>> 
>>>>> +1 for breaking this up, will do that.
>>>>> 
>>>>> I can see your point and maybe it makes sense to introduce part of
>>>>> scoping
>>>>> to incorporate support for nested loops (otherwise it can’t work).
>>>>> Let us think about this a bit. We will share another draft for a more
>>>>> detail description of the approach you are suggesting asap.
>>>>> 
>>>>> 
>>>>> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
>>>>> sewen@apache.org><ma...@apache.org> <mailto:
>>>>> sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
>>>>>>> <mailto:sewen
>>>>> @apache.org<http://apache.org/>>> wrote:
>>>>> 
>>>>> How about we break this up into two FLIPs? There are after all two
>>>>> orthogonal problems (termination, fault tolerance) with quite different
>>>>> discussion states.
>>>>> 
>>>>> Concerning fault tolerance, I like the ideas.
>>>>> For the termination proposal, I would like to iterate a bit more.
>>>>> 
>>>>> *Termination algorithm:*
>>>>> 
>>>>> My main concern here is the introduction of a termination coordinator
>>>>> and
>>>>> any involvement of RPC messages when deciding termination.
>>>>> That would be such a fundamental break with the current runtime
>>>>> architecture, and it would make the currently very elegant and simple
>>>>> model
>>>>> much more complicated and harder to maintain. Given that Flink's
>>>>> runtime is
>>>>> complex enough, I would really like to avoid that.
>>>>> 
>>>>> The current runtime paradigm coordinates between operators strictly via
>>>>> in-band events. RPC calls happen between operators and the master for
>>>>> triggering and acknowledging execution and checkpoints.
>>>>> 
>>>>> I was wondering whether we can keep following that paradigm and still
>>>>> get
>>>>> most of what you are proposing here. In some sense, all we need to do is
>>>>> replace RPC calls with in-band events, and "decentralize" the
>>>>> coordinator
>>>>> such that every operator can make its own termination decision by
>>>>> itself.
>>>>> 
>>>>> This is only a rough sketch, you probably need to flesh it out more.
>>>>> 
>>>>> - I assume that the OP in the diagram knows that it is in a loop and
>>>>> that
>>>>> it is the one connected to the head and tail
>>>>> 
>>>>> - When OP receives and EndOfStream Event from the regular source (RS),
>>>>> it
>>>>> emits an "AttemptTermination" event downstream to the operators
>>>>> involved in
>>>>> the loop. It attaches an attempt sequence number and memorizes that
>>>>> - Tail and Head forward these events
>>>>> - When OP receives the event back with the same attempt sequence number,
>>>>> and no records came in the meantime, it shuts down and emits EndOfStream
>>>>> downstream
>>>>> - When other records came back between emitting the AttemptTermination
>>>>> event and receiving it back, then it emits a new AttemptTermination
>>>>> event
>>>>> with the next sequence number.
>>>>> - This should terminate as soon as the loop is empty.
>>>>> 
>>>>> Might this model even generalize to nested loops, where the
>>>>> "AttemptTermination" event is scoped by the loop's nesting level?
>>>>> 
>>>>> Let me know what you think!
>>>>> 
>>>>> 
>>>>> Best,
>>>>> Stephan
>>>>> 
>>>>> 
>>>>> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org
>>> <mailto:
>>>>> sewen@apache.org><ma...@apache.org>
>>>>> <ma...@apache.org><mailto:
>>>>> sewen@apache.org<ma...@apache.org>
>>>>> <ma...@apache.org>>> wrote:
>>>>> 
>>>>> Hi!
>>>>> 
>>>>> I am still scanning it and compiling some comments. Give me a bit ;-)
>>>>> 
>>>>> Stephan
>>>>> 
>>>>> 
>>>>> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se<mailto:
>>>>> parisc@kth.se><ma...@kth.se> <mailto:
>>>>> parisc@kth.se<ma...@kth.se>><mailto:
>>>>> parisc@kth.se<ma...@kth.se> <mailto:
>>>>> parisc@kth.se>>> wrote:
>>>>> 
>>>>> Hey all,
>>>>> 
>>>>> Now that many of you have already scanned the document (judging from the
>>>>> views) maybe it is time to give back some feedback!
>>>>> Did you like it? Would you suggest an improvement?
>>>>> 
>>>>> I would suggest not to leave this in the void. It has to do with
>>>>> important properties that the system promises to provide.
>>>>> Me and Fouad will do our best to answer your questions and discuss this
>>>>> further.
>>>>> 
>>>>> cheers
>>>>> Paris
>>>>> 
>>>>> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
>>>>> parisc@kth.se><ma...@kth.se> <mailto:
>>>>> parisc@kth.se<ma...@kth.se>><mailto:
>>> parisc@
>>>>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/
>>>>>>> <mailto:parisc@k
>>>>> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
>>>>> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
>>>>> 
>>>>> Hello everyone,
>>>>> 
>>>>> Loops in Apache Flink have a good potential to become a much more
>>>>> powerful thing in future version of Apache Flink.
>>>>> There is generally high demand to make them usable and first of all
>>>>> production-ready for upcoming releases.
>>>>> 
>>>>> As a first commitment we would like to propose FLIP-13 for consistent
>>>>> processing with Loops.
>>>>> We are also working on scoped loops for Q1 2017 which we can share if
>>>>> there is enough interest.
>>>>> 
>>>>> For now, that is an improvement proposal that solves two pending major
>>>>> issues:
>>>>> 
>>>>> 1) The (not so trivial) problem of correct termination of jobs with
>>>>> iterations
>>>>> 2) The applicability of the checkpointing algorithm to iterative
>>>>> dataflow
>>>>> graphs.
>>>>> 
>>>>> We would really appreciate it if you go through the linked draft
>>>>> (motivation and proposed changes) for FLIP-13 and point out comments,
>>>>> preferably publicly in this devlist discussion before we go ahead and
>>>>> update the wiki.
>>>>> 
>>>>> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
>>>>> BhDbtoYucmByBjRBISs/edit?usp=sharing
>>>>> 
>>>>> cheers
>>>>> 
>>>>> Paris and Fouad
>>>>> 
>>>>> 
>>> 
>>> 
> 


Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Paris Carbone <pa...@kth.se>.
Sounds like a plan!

Can someone grant me access to write in the wiki please?
My username is “senorcarbone”.

Paris

> On 16 Nov 2016, at 14:30, Gyula Fóra <gy...@gmail.com> wrote:
> 
> I am not completely sure whether we should deprecate the old API for 1.2 or
> remove it completely. Personally I am in favor of removing it, I don't
> think it is a huge burden to move to the new one if it makes for a much
> nicer user experience.
> 
> I think you can go ahead add the FLIP to the wiki and open the PR so we can
> start the review if you have it ready anyways.
> 
> Gyula
> 
> Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 16., Sze,
> 11:55):
> 
>> Thanks for reviewing, Gyula.
>> 
>> One thing that is still up to discussion is whether we should remove
>> completely the old iterations API or simply mark it as deprecated till v2.0.
>> Also, not sure what is the best process now. We have the changes ready.
>> Should I copy the FLIP to the wiki and trigger the PRs or wait for a few
>> more days in case someone has objections?
>> 
>> @Stephan, what is your take on our interpretation of the approach you
>> suggested? Should we proceed or is there anything that you do not find nice?
>> 
>> Paris
>> 
>>> On 15 Nov 2016, at 10:01, Gyula Fóra <gy...@apache.org> wrote:
>>> 
>>> Hi Paris,
>>> 
>>> I like the proposed changes to the iteration API, this cleans up things
>> in
>>> the Java API without any strict restriction I think (it was never a
>> problem
>>> in the Scala API).
>>> 
>>> The termination algorithm based on the proposed scoped loops seems to be
>>> fairly simple and looks good :)
>>> 
>>> Cheers,
>>> Gyula
>>> 
>>> Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 14., H,
>> 8:50):
>>> 
>>>> That would be great Shi! Let's take that offline.
>>>> 
>>>> Anyone else interested in the iteration changes? It would be nice to
>>>> incorporate these to v1.2 if possible so I count on your review asap.
>>>> 
>>>> cheers,
>>>> Paris
>>>> 
>>>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang.sxg@alibaba-inc.com
>>>> <ma...@alibaba-inc.com>> wrote:
>>>> 
>>>> Hi Paris
>>>> 
>>>> Unfortunately, the project is not public yet.
>>>> But i can provide you a primitive implementation of the update protocol
>> in
>>>> the paper. It’s implemented in Storm. Since the protocol assumes the
>>>> communication channels between different tasks are dual, i think it’s
>> not
>>>> easy to adapt it to Flink.
>>>> 
>>>> Regards
>>>> Xiaogang
>>>> 
>>>> 
>>>> 在 2016年11月12日,上午3:03,Paris Carbone <parisc@kth.se<mailto:parisc@kth.se
>>>> 
>>>> 写道:
>>>> 
>>>> Hi Shi,
>>>> 
>>>> Naiad/Timely Dataflow and other projects use global coordination which
>> is
>>>> very convenient for asynchronous progress tracking in general but it has
>>>> some downsides in a production systems that count on in-flight
>>>> transactional control mechanisms and rollback recovery guarantees. This
>> is
>>>> why we generally prefer decentralized approaches (despite their our
>>>> downsides).
>>>> 
>>>> Regarding synchronous/structured iterations, this is a bit off topic and
>>>> they are a bit of a different story as you already know.
>>>> We maintain a graph streaming (gelly-streams) library on Flink that you
>>>> might find interesting [1]. Vasia, another Flink committer is also
>> working
>>>> on that among others.
>>>> You can keep an eye on it since we are planning to use this project as a
>>>> showcase for a new way of doing structured and fixpoint iterations on
>>>> streams in the future.
>>>> 
>>>> P.S. many thanks for sharing your publication, it was an interesting
>> read.
>>>> Do you happen to have your source code public? We could most certainly
>> use
>>>> it in an benchmark soon.
>>>> 
>>>> [1] https://github.com/vasia/gelly-streaming
>>>> 
>>>> 
>>>> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<mailto:
>>>> shixiaogangg@gmail.com><ma...@gmail.com>> wrote:
>>>> 
>>>> Hi, Fouad
>>>> 
>>>> Thank you for the explanation. Now the centralized method seems correct
>> to
>>>> me.
>>>> The passing of StatusUpdate events will lead to synchronous iterations
>> and
>>>> we are using the information in each iterations to terminate the
>>>> computation.
>>>> 
>>>> Actually, i prefer the centralized method because in many applications,
>> the
>>>> convergence may depend on some global statistics.
>>>> For example, a PageRank program may terminate the computation when 99%
>>>> vertices are converged.
>>>> I think those learning programs which cannot reach the fixed-point
>>>> (oscillating around the fixed-point) can benefit a lot from such
>> features.
>>>> The decentralized method makes it hard to support such convergence
>>>> conditions.
>>>> 
>>>> 
>>>> Another concern is that Flink cannot produce periodical results in the
>>>> iteration over infinite data streams.
>>>> Take a concrete example. Given an edge stream constructing a graph, the
>>>> user may need the PageRank weight of each vertex in the graphs formed at
>>>> certain instants.
>>>> Currently Flink does not provide any input or iteration information to
>>>> users, making users hard to implement such real-time iterative
>>>> applications.
>>>> Such features are supported in both Naiad and Tornado. I think Flink
>> should
>>>> support it as well.
>>>> 
>>>> What do you think?
>>>> 
>>>> Regards
>>>> Xiaogang
>>>> 
>>>> 
>>>> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<mailto:
>>>> fouad.alsayadi@gmail.com><ma...@gmail.com>>:
>>>> 
>>>> Hi Shi,
>>>> 
>>>> It seems that you are referring to the centralized algorithm which is no
>>>> longer the proposed version.
>>>> In the decentralized version (check last doc) there is no master node or
>>>> global coordination involved.
>>>> 
>>>> Let us keep this discussion to the decentralized one if possible.
>>>> 
>>>> To answer your points on the previous approach, there is a catch in your
>>>> trace at t7. Here is what is happening :
>>>> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
>>>> runtime (see 2.1 in the steps).
>>>> - RS and Heads will broadcast StatusUpdate  event and will not notify
>> its
>>>> status.
>>>> - When StatusUpdate event gets back to the head it will notify its
>>>> WORKING  status.
>>>> 
>>>> Hope that answers your concern.
>>>> 
>>>> Best,
>>>> Fouad
>>>> 
>>>> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com
>> <mailto:
>>>> shixiaogangg@gmail.com><ma...@gmail.com>>
>>>> wrote:
>>>> 
>>>> Hi Paris
>>>> 
>>>> I have several concerns about the correctness of the termination
>>>> protocol.
>>>> I think the termination protocol put an end to the computation even when
>>>> the computation has not converged.
>>>> 
>>>> Suppose there exists a loop context constructed by a OP operator, a Head
>>>> operator and a Tail operator (illustrated in Figure 2 in the first
>>>> draft).
>>>> The stream only contains one record. OP will pass the record to its
>>>> downstream operators 10 times. In other words, the loop should iterate
>> 10
>>>> times.
>>>> 
>>>> If I understood the protocol correctly, the following event sequence may
>>>> happen in the computation:
>>>> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream",
>> the
>>>> system enters into Speculative Phase.
>>>> t2:  OP receives Record and emits it to TAIL.
>>>> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
>>>> state.
>>>> t4. OP receives the UpdateStatus event from HEAD, and notifies with an
>>>> WORKING state.
>>>> t5. TAIL receives Record and emits it to HEAD.
>>>> t6. TAIL receives the UpdateStatus event from OP, and notifies with an
>>>> WORKING state.
>>>> t7. The system starts a new attempt. HEAD receives the UpdateStatus
>> event
>>>> and notifies with an IDLE state.  (Record is still in transition.)
>>>> t8. OP receives the UpdateStatus event from HEAD and notifies with an
>>>> IDLE
>>>> state.
>>>> t9. TAIL receives the UpdateStatus event from OP and notifies with an
>>>> IDLE
>>>> state.
>>>> t10. HEAD receives Record from TAIL and emits it to OP.
>>>> t11. System puts an end to the computation.
>>>> 
>>>> Though the computation is expected to iterate 10 times, it ends earlier.
>>>> The cause is that the communication channels of MASTER=>HEAD and
>>>> TAIL=>HEAD
>>>> are not synchronized.
>>>> 
>>>> I think the protocol follows the idea of the Chandy-Lamport algorithm to
>>>> determine a global state.
>>>> But the information of whether a node has processed any record to since
>>>> the
>>>> last request is not STABLE.
>>>> Hence i doubt the correctness of the protocol.
>>>> 
>>>> To determine the termination correctly, we need some information that is
>>>> stable.
>>>> In timelyflow, Naiad collects the progress made in each iteration and
>>>> terminates the loop when a little progress is made in an iteration
>>>> (identified by the timestamp vector).
>>>> The information is stable because the result of an iteration cannot be
>>>> changed by the execution of later iterations.
>>>> 
>>>> A similar method is also adopted in Tornado.
>>>> You may see my paper for more details about the termination of loops:
>>>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
>>>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
>>>> 
>>>> Regards
>>>> Xiaogang
>>>> 
>>>> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
>>>> parisc@kth.se><ma...@kth.se> <mailto:
>>>> parisc@kth.se<ma...@kth.se>>>:
>>>> 
>>>> Hi again Flink folks,
>>>> 
>>>> Here is our new proposal that addresses Job Termination - the loop fault
>>>> tolerance proposal will follow shortly.
>>>> As Stephan hinted, we need operators to be aware of their scope level.
>>>> 
>>>> Thus, it is time we make loops great again! :)
>>>> 
>>>> Part of this FLIP basically introduces a new functional, compositional
>>>> API
>>>> for defining asynchronous loops for DataStreams.
>>>> This is coupled with a decentralized algorithm for job termination with
>>>> loops - along the lines of what Stephan described.
>>>> We are already working on the actual prototypes as you can observe in
>>>> the
>>>> links of the doc.
>>>> 
>>>> Please let us know if you like (or don't like) it and why, in this mail
>>>> discussion.
>>>> 
>>>> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
>>>> PfTHtq3173EhsAkpBoQ
>>>> 
>>>> cheers
>>>> Paris and Fouad
>>>> 
>>>> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
>>>> parisc@kth.se> <mailto:
>>>> parisc@kth.se<ma...@kth.se>><mailto:
>> parisc@
>>>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/
>>>>> 
>>>> wrote:
>>>> 
>>>> Hey Stephan,
>>>> 
>>>> Thanks for looking into it!
>>>> 
>>>> +1 for breaking this up, will do that.
>>>> 
>>>> I can see your point and maybe it makes sense to introduce part of
>>>> scoping
>>>> to incorporate support for nested loops (otherwise it can’t work).
>>>> Let us think about this a bit. We will share another draft for a more
>>>> detail description of the approach you are suggesting asap.
>>>> 
>>>> 
>>>> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
>>>> sewen@apache.org><ma...@apache.org> <mailto:
>>>> sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
>>>>>> <mailto:sewen
>>>> @apache.org<http://apache.org/>>> wrote:
>>>> 
>>>> How about we break this up into two FLIPs? There are after all two
>>>> orthogonal problems (termination, fault tolerance) with quite different
>>>> discussion states.
>>>> 
>>>> Concerning fault tolerance, I like the ideas.
>>>> For the termination proposal, I would like to iterate a bit more.
>>>> 
>>>> *Termination algorithm:*
>>>> 
>>>> My main concern here is the introduction of a termination coordinator
>>>> and
>>>> any involvement of RPC messages when deciding termination.
>>>> That would be such a fundamental break with the current runtime
>>>> architecture, and it would make the currently very elegant and simple
>>>> model
>>>> much more complicated and harder to maintain. Given that Flink's
>>>> runtime is
>>>> complex enough, I would really like to avoid that.
>>>> 
>>>> The current runtime paradigm coordinates between operators strictly via
>>>> in-band events. RPC calls happen between operators and the master for
>>>> triggering and acknowledging execution and checkpoints.
>>>> 
>>>> I was wondering whether we can keep following that paradigm and still
>>>> get
>>>> most of what you are proposing here. In some sense, all we need to do is
>>>> replace RPC calls with in-band events, and "decentralize" the
>>>> coordinator
>>>> such that every operator can make its own termination decision by
>>>> itself.
>>>> 
>>>> This is only a rough sketch, you probably need to flesh it out more.
>>>> 
>>>> - I assume that the OP in the diagram knows that it is in a loop and
>>>> that
>>>> it is the one connected to the head and tail
>>>> 
>>>> - When OP receives and EndOfStream Event from the regular source (RS),
>>>> it
>>>> emits an "AttemptTermination" event downstream to the operators
>>>> involved in
>>>> the loop. It attaches an attempt sequence number and memorizes that
>>>> - Tail and Head forward these events
>>>> - When OP receives the event back with the same attempt sequence number,
>>>> and no records came in the meantime, it shuts down and emits EndOfStream
>>>> downstream
>>>> - When other records came back between emitting the AttemptTermination
>>>> event and receiving it back, then it emits a new AttemptTermination
>>>> event
>>>> with the next sequence number.
>>>> - This should terminate as soon as the loop is empty.
>>>> 
>>>> Might this model even generalize to nested loops, where the
>>>> "AttemptTermination" event is scoped by the loop's nesting level?
>>>> 
>>>> Let me know what you think!
>>>> 
>>>> 
>>>> Best,
>>>> Stephan
>>>> 
>>>> 
>>>> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org
>> <mailto:
>>>> sewen@apache.org><ma...@apache.org>
>>>> <ma...@apache.org><mailto:
>>>> sewen@apache.org<ma...@apache.org>
>>>> <ma...@apache.org>>> wrote:
>>>> 
>>>> Hi!
>>>> 
>>>> I am still scanning it and compiling some comments. Give me a bit ;-)
>>>> 
>>>> Stephan
>>>> 
>>>> 
>>>> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se<mailto:
>>>> parisc@kth.se><ma...@kth.se> <mailto:
>>>> parisc@kth.se<ma...@kth.se>><mailto:
>>>> parisc@kth.se<ma...@kth.se> <mailto:
>>>> parisc@kth.se>>> wrote:
>>>> 
>>>> Hey all,
>>>> 
>>>> Now that many of you have already scanned the document (judging from the
>>>> views) maybe it is time to give back some feedback!
>>>> Did you like it? Would you suggest an improvement?
>>>> 
>>>> I would suggest not to leave this in the void. It has to do with
>>>> important properties that the system promises to provide.
>>>> Me and Fouad will do our best to answer your questions and discuss this
>>>> further.
>>>> 
>>>> cheers
>>>> Paris
>>>> 
>>>> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
>>>> parisc@kth.se><ma...@kth.se> <mailto:
>>>> parisc@kth.se<ma...@kth.se>><mailto:
>> parisc@
>>>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/
>>>>>> <mailto:parisc@k
>>>> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
>>>> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
>>>> 
>>>> Hello everyone,
>>>> 
>>>> Loops in Apache Flink have a good potential to become a much more
>>>> powerful thing in future version of Apache Flink.
>>>> There is generally high demand to make them usable and first of all
>>>> production-ready for upcoming releases.
>>>> 
>>>> As a first commitment we would like to propose FLIP-13 for consistent
>>>> processing with Loops.
>>>> We are also working on scoped loops for Q1 2017 which we can share if
>>>> there is enough interest.
>>>> 
>>>> For now, that is an improvement proposal that solves two pending major
>>>> issues:
>>>> 
>>>> 1) The (not so trivial) problem of correct termination of jobs with
>>>> iterations
>>>> 2) The applicability of the checkpointing algorithm to iterative
>>>> dataflow
>>>> graphs.
>>>> 
>>>> We would really appreciate it if you go through the linked draft
>>>> (motivation and proposed changes) for FLIP-13 and point out comments,
>>>> preferably publicly in this devlist discussion before we go ahead and
>>>> update the wiki.
>>>> 
>>>> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
>>>> BhDbtoYucmByBjRBISs/edit?usp=sharing
>>>> 
>>>> cheers
>>>> 
>>>> Paris and Fouad
>>>> 
>>>> 
>> 
>> 


Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Gyula Fóra <gy...@gmail.com>.
I am not completely sure whether we should deprecate the old API for 1.2 or
remove it completely. Personally I am in favor of removing it, I don't
think it is a huge burden to move to the new one if it makes for a much
nicer user experience.

I think you can go ahead add the FLIP to the wiki and open the PR so we can
start the review if you have it ready anyways.

Gyula

Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 16., Sze,
11:55):

> Thanks for reviewing, Gyula.
>
> One thing that is still up to discussion is whether we should remove
> completely the old iterations API or simply mark it as deprecated till v2.0.
> Also, not sure what is the best process now. We have the changes ready.
> Should I copy the FLIP to the wiki and trigger the PRs or wait for a few
> more days in case someone has objections?
>
> @Stephan, what is your take on our interpretation of the approach you
> suggested? Should we proceed or is there anything that you do not find nice?
>
> Paris
>
> > On 15 Nov 2016, at 10:01, Gyula Fóra <gy...@apache.org> wrote:
> >
> > Hi Paris,
> >
> > I like the proposed changes to the iteration API, this cleans up things
> in
> > the Java API without any strict restriction I think (it was never a
> problem
> > in the Scala API).
> >
> > The termination algorithm based on the proposed scoped loops seems to be
> > fairly simple and looks good :)
> >
> > Cheers,
> > Gyula
> >
> > Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 14., H,
> 8:50):
> >
> >> That would be great Shi! Let's take that offline.
> >>
> >> Anyone else interested in the iteration changes? It would be nice to
> >> incorporate these to v1.2 if possible so I count on your review asap.
> >>
> >> cheers,
> >> Paris
> >>
> >> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang.sxg@alibaba-inc.com
> >> <ma...@alibaba-inc.com>> wrote:
> >>
> >> Hi Paris
> >>
> >> Unfortunately, the project is not public yet.
> >> But i can provide you a primitive implementation of the update protocol
> in
> >> the paper. It’s implemented in Storm. Since the protocol assumes the
> >> communication channels between different tasks are dual, i think it’s
> not
> >> easy to adapt it to Flink.
> >>
> >> Regards
> >> Xiaogang
> >>
> >>
> >> 在 2016年11月12日,上午3:03,Paris Carbone <parisc@kth.se<mailto:parisc@kth.se
> >>
> >> 写道:
> >>
> >> Hi Shi,
> >>
> >> Naiad/Timely Dataflow and other projects use global coordination which
> is
> >> very convenient for asynchronous progress tracking in general but it has
> >> some downsides in a production systems that count on in-flight
> >> transactional control mechanisms and rollback recovery guarantees. This
> is
> >> why we generally prefer decentralized approaches (despite their our
> >> downsides).
> >>
> >> Regarding synchronous/structured iterations, this is a bit off topic and
> >> they are a bit of a different story as you already know.
> >> We maintain a graph streaming (gelly-streams) library on Flink that you
> >> might find interesting [1]. Vasia, another Flink committer is also
> working
> >> on that among others.
> >> You can keep an eye on it since we are planning to use this project as a
> >> showcase for a new way of doing structured and fixpoint iterations on
> >> streams in the future.
> >>
> >> P.S. many thanks for sharing your publication, it was an interesting
> read.
> >> Do you happen to have your source code public? We could most certainly
> use
> >> it in an benchmark soon.
> >>
> >> [1] https://github.com/vasia/gelly-streaming
> >>
> >>
> >> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<mailto:
> >> shixiaogangg@gmail.com><ma...@gmail.com>> wrote:
> >>
> >> Hi, Fouad
> >>
> >> Thank you for the explanation. Now the centralized method seems correct
> to
> >> me.
> >> The passing of StatusUpdate events will lead to synchronous iterations
> and
> >> we are using the information in each iterations to terminate the
> >> computation.
> >>
> >> Actually, i prefer the centralized method because in many applications,
> the
> >> convergence may depend on some global statistics.
> >> For example, a PageRank program may terminate the computation when 99%
> >> vertices are converged.
> >> I think those learning programs which cannot reach the fixed-point
> >> (oscillating around the fixed-point) can benefit a lot from such
> features.
> >> The decentralized method makes it hard to support such convergence
> >> conditions.
> >>
> >>
> >> Another concern is that Flink cannot produce periodical results in the
> >> iteration over infinite data streams.
> >> Take a concrete example. Given an edge stream constructing a graph, the
> >> user may need the PageRank weight of each vertex in the graphs formed at
> >> certain instants.
> >> Currently Flink does not provide any input or iteration information to
> >> users, making users hard to implement such real-time iterative
> >> applications.
> >> Such features are supported in both Naiad and Tornado. I think Flink
> should
> >> support it as well.
> >>
> >> What do you think?
> >>
> >> Regards
> >> Xiaogang
> >>
> >>
> >> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<mailto:
> >> fouad.alsayadi@gmail.com><ma...@gmail.com>>:
> >>
> >> Hi Shi,
> >>
> >> It seems that you are referring to the centralized algorithm which is no
> >> longer the proposed version.
> >> In the decentralized version (check last doc) there is no master node or
> >> global coordination involved.
> >>
> >> Let us keep this discussion to the decentralized one if possible.
> >>
> >> To answer your points on the previous approach, there is a catch in your
> >> trace at t7. Here is what is happening :
> >> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
> >> runtime (see 2.1 in the steps).
> >> - RS and Heads will broadcast StatusUpdate  event and will not notify
> its
> >> status.
> >> - When StatusUpdate event gets back to the head it will notify its
> >> WORKING  status.
> >>
> >> Hope that answers your concern.
> >>
> >> Best,
> >> Fouad
> >>
> >> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com
> <mailto:
> >> shixiaogangg@gmail.com><ma...@gmail.com>>
> >> wrote:
> >>
> >> Hi Paris
> >>
> >> I have several concerns about the correctness of the termination
> >> protocol.
> >> I think the termination protocol put an end to the computation even when
> >> the computation has not converged.
> >>
> >> Suppose there exists a loop context constructed by a OP operator, a Head
> >> operator and a Tail operator (illustrated in Figure 2 in the first
> >> draft).
> >> The stream only contains one record. OP will pass the record to its
> >> downstream operators 10 times. In other words, the loop should iterate
> 10
> >> times.
> >>
> >> If I understood the protocol correctly, the following event sequence may
> >> happen in the computation:
> >> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream",
> the
> >> system enters into Speculative Phase.
> >> t2:  OP receives Record and emits it to TAIL.
> >> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
> >> state.
> >> t4. OP receives the UpdateStatus event from HEAD, and notifies with an
> >> WORKING state.
> >> t5. TAIL receives Record and emits it to HEAD.
> >> t6. TAIL receives the UpdateStatus event from OP, and notifies with an
> >> WORKING state.
> >> t7. The system starts a new attempt. HEAD receives the UpdateStatus
> event
> >> and notifies with an IDLE state.  (Record is still in transition.)
> >> t8. OP receives the UpdateStatus event from HEAD and notifies with an
> >> IDLE
> >> state.
> >> t9. TAIL receives the UpdateStatus event from OP and notifies with an
> >> IDLE
> >> state.
> >> t10. HEAD receives Record from TAIL and emits it to OP.
> >> t11. System puts an end to the computation.
> >>
> >> Though the computation is expected to iterate 10 times, it ends earlier.
> >> The cause is that the communication channels of MASTER=>HEAD and
> >> TAIL=>HEAD
> >> are not synchronized.
> >>
> >> I think the protocol follows the idea of the Chandy-Lamport algorithm to
> >> determine a global state.
> >> But the information of whether a node has processed any record to since
> >> the
> >> last request is not STABLE.
> >> Hence i doubt the correctness of the protocol.
> >>
> >> To determine the termination correctly, we need some information that is
> >> stable.
> >> In timelyflow, Naiad collects the progress made in each iteration and
> >> terminates the loop when a little progress is made in an iteration
> >> (identified by the timestamp vector).
> >> The information is stable because the result of an iteration cannot be
> >> changed by the execution of later iterations.
> >>
> >> A similar method is also adopted in Tornado.
> >> You may see my paper for more details about the termination of loops:
> >> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
> >> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
> >>
> >> Regards
> >> Xiaogang
> >>
> >> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
> >> parisc@kth.se><ma...@kth.se> <mailto:
> >> parisc@kth.se<ma...@kth.se>>>:
> >>
> >> Hi again Flink folks,
> >>
> >> Here is our new proposal that addresses Job Termination - the loop fault
> >> tolerance proposal will follow shortly.
> >> As Stephan hinted, we need operators to be aware of their scope level.
> >>
> >> Thus, it is time we make loops great again! :)
> >>
> >> Part of this FLIP basically introduces a new functional, compositional
> >> API
> >> for defining asynchronous loops for DataStreams.
> >> This is coupled with a decentralized algorithm for job termination with
> >> loops - along the lines of what Stephan described.
> >> We are already working on the actual prototypes as you can observe in
> >> the
> >> links of the doc.
> >>
> >> Please let us know if you like (or don't like) it and why, in this mail
> >> discussion.
> >>
> >> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
> >> PfTHtq3173EhsAkpBoQ
> >>
> >> cheers
> >> Paris and Fouad
> >>
> >> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
> >> parisc@kth.se> <mailto:
> >> parisc@kth.se<ma...@kth.se>><mailto:
> parisc@
> >> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/
> >>>
> >> wrote:
> >>
> >> Hey Stephan,
> >>
> >> Thanks for looking into it!
> >>
> >> +1 for breaking this up, will do that.
> >>
> >> I can see your point and maybe it makes sense to introduce part of
> >> scoping
> >> to incorporate support for nested loops (otherwise it can’t work).
> >> Let us think about this a bit. We will share another draft for a more
> >> detail description of the approach you are suggesting asap.
> >>
> >>
> >> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
> >> sewen@apache.org><ma...@apache.org> <mailto:
> >> sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
> >>>> <mailto:sewen
> >> @apache.org<http://apache.org/>>> wrote:
> >>
> >> How about we break this up into two FLIPs? There are after all two
> >> orthogonal problems (termination, fault tolerance) with quite different
> >> discussion states.
> >>
> >> Concerning fault tolerance, I like the ideas.
> >> For the termination proposal, I would like to iterate a bit more.
> >>
> >> *Termination algorithm:*
> >>
> >> My main concern here is the introduction of a termination coordinator
> >> and
> >> any involvement of RPC messages when deciding termination.
> >> That would be such a fundamental break with the current runtime
> >> architecture, and it would make the currently very elegant and simple
> >> model
> >> much more complicated and harder to maintain. Given that Flink's
> >> runtime is
> >> complex enough, I would really like to avoid that.
> >>
> >> The current runtime paradigm coordinates between operators strictly via
> >> in-band events. RPC calls happen between operators and the master for
> >> triggering and acknowledging execution and checkpoints.
> >>
> >> I was wondering whether we can keep following that paradigm and still
> >> get
> >> most of what you are proposing here. In some sense, all we need to do is
> >> replace RPC calls with in-band events, and "decentralize" the
> >> coordinator
> >> such that every operator can make its own termination decision by
> >> itself.
> >>
> >> This is only a rough sketch, you probably need to flesh it out more.
> >>
> >> - I assume that the OP in the diagram knows that it is in a loop and
> >> that
> >> it is the one connected to the head and tail
> >>
> >> - When OP receives and EndOfStream Event from the regular source (RS),
> >> it
> >> emits an "AttemptTermination" event downstream to the operators
> >> involved in
> >> the loop. It attaches an attempt sequence number and memorizes that
> >> - Tail and Head forward these events
> >> - When OP receives the event back with the same attempt sequence number,
> >> and no records came in the meantime, it shuts down and emits EndOfStream
> >> downstream
> >> - When other records came back between emitting the AttemptTermination
> >> event and receiving it back, then it emits a new AttemptTermination
> >> event
> >> with the next sequence number.
> >> - This should terminate as soon as the loop is empty.
> >>
> >> Might this model even generalize to nested loops, where the
> >> "AttemptTermination" event is scoped by the loop's nesting level?
> >>
> >> Let me know what you think!
> >>
> >>
> >> Best,
> >> Stephan
> >>
> >>
> >> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org
> <mailto:
> >> sewen@apache.org><ma...@apache.org>
> >> <ma...@apache.org><mailto:
> >> sewen@apache.org<ma...@apache.org>
> >> <ma...@apache.org>>> wrote:
> >>
> >> Hi!
> >>
> >> I am still scanning it and compiling some comments. Give me a bit ;-)
> >>
> >> Stephan
> >>
> >>
> >> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se<mailto:
> >> parisc@kth.se><ma...@kth.se> <mailto:
> >> parisc@kth.se<ma...@kth.se>><mailto:
> >> parisc@kth.se<ma...@kth.se> <mailto:
> >> parisc@kth.se>>> wrote:
> >>
> >> Hey all,
> >>
> >> Now that many of you have already scanned the document (judging from the
> >> views) maybe it is time to give back some feedback!
> >> Did you like it? Would you suggest an improvement?
> >>
> >> I would suggest not to leave this in the void. It has to do with
> >> important properties that the system promises to provide.
> >> Me and Fouad will do our best to answer your questions and discuss this
> >> further.
> >>
> >> cheers
> >> Paris
> >>
> >> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
> >> parisc@kth.se><ma...@kth.se> <mailto:
> >> parisc@kth.se<ma...@kth.se>><mailto:
> parisc@
> >> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/
> >>>> <mailto:parisc@k
> >> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
> >> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
> >>
> >> Hello everyone,
> >>
> >> Loops in Apache Flink have a good potential to become a much more
> >> powerful thing in future version of Apache Flink.
> >> There is generally high demand to make them usable and first of all
> >> production-ready for upcoming releases.
> >>
> >> As a first commitment we would like to propose FLIP-13 for consistent
> >> processing with Loops.
> >> We are also working on scoped loops for Q1 2017 which we can share if
> >> there is enough interest.
> >>
> >> For now, that is an improvement proposal that solves two pending major
> >> issues:
> >>
> >> 1) The (not so trivial) problem of correct termination of jobs with
> >> iterations
> >> 2) The applicability of the checkpointing algorithm to iterative
> >> dataflow
> >> graphs.
> >>
> >> We would really appreciate it if you go through the linked draft
> >> (motivation and proposed changes) for FLIP-13 and point out comments,
> >> preferably publicly in this devlist discussion before we go ahead and
> >> update the wiki.
> >>
> >> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
> >> BhDbtoYucmByBjRBISs/edit?usp=sharing
> >>
> >> cheers
> >>
> >> Paris and Fouad
> >>
> >>
>
>

Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Paris Carbone <pa...@kth.se>.
Thanks for reviewing, Gyula.

One thing that is still up to discussion is whether we should remove completely the old iterations API or simply mark it as deprecated till v2.0.
Also, not sure what is the best process now. We have the changes ready. Should I copy the FLIP to the wiki and trigger the PRs or wait for a few more days in case someone has objections?

@Stephan, what is your take on our interpretation of the approach you suggested? Should we proceed or is there anything that you do not find nice?

Paris

> On 15 Nov 2016, at 10:01, Gyula Fóra <gy...@apache.org> wrote:
> 
> Hi Paris,
> 
> I like the proposed changes to the iteration API, this cleans up things in
> the Java API without any strict restriction I think (it was never a problem
> in the Scala API).
> 
> The termination algorithm based on the proposed scoped loops seems to be
> fairly simple and looks good :)
> 
> Cheers,
> Gyula
> 
> Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 14., H, 8:50):
> 
>> That would be great Shi! Let's take that offline.
>> 
>> Anyone else interested in the iteration changes? It would be nice to
>> incorporate these to v1.2 if possible so I count on your review asap.
>> 
>> cheers,
>> Paris
>> 
>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang.sxg@alibaba-inc.com
>> <ma...@alibaba-inc.com>> wrote:
>> 
>> Hi Paris
>> 
>> Unfortunately, the project is not public yet.
>> But i can provide you a primitive implementation of the update protocol in
>> the paper. It’s implemented in Storm. Since the protocol assumes the
>> communication channels between different tasks are dual, i think it’s not
>> easy to adapt it to Flink.
>> 
>> Regards
>> Xiaogang
>> 
>> 
>> 在 2016年11月12日,上午3:03,Paris Carbone <pa...@kth.se>>
>> 写道:
>> 
>> Hi Shi,
>> 
>> Naiad/Timely Dataflow and other projects use global coordination which is
>> very convenient for asynchronous progress tracking in general but it has
>> some downsides in a production systems that count on in-flight
>> transactional control mechanisms and rollback recovery guarantees. This is
>> why we generally prefer decentralized approaches (despite their our
>> downsides).
>> 
>> Regarding synchronous/structured iterations, this is a bit off topic and
>> they are a bit of a different story as you already know.
>> We maintain a graph streaming (gelly-streams) library on Flink that you
>> might find interesting [1]. Vasia, another Flink committer is also working
>> on that among others.
>> You can keep an eye on it since we are planning to use this project as a
>> showcase for a new way of doing structured and fixpoint iterations on
>> streams in the future.
>> 
>> P.S. many thanks for sharing your publication, it was an interesting read.
>> Do you happen to have your source code public? We could most certainly use
>> it in an benchmark soon.
>> 
>> [1] https://github.com/vasia/gelly-streaming
>> 
>> 
>> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<mailto:
>> shixiaogangg@gmail.com><ma...@gmail.com>> wrote:
>> 
>> Hi, Fouad
>> 
>> Thank you for the explanation. Now the centralized method seems correct to
>> me.
>> The passing of StatusUpdate events will lead to synchronous iterations and
>> we are using the information in each iterations to terminate the
>> computation.
>> 
>> Actually, i prefer the centralized method because in many applications, the
>> convergence may depend on some global statistics.
>> For example, a PageRank program may terminate the computation when 99%
>> vertices are converged.
>> I think those learning programs which cannot reach the fixed-point
>> (oscillating around the fixed-point) can benefit a lot from such features.
>> The decentralized method makes it hard to support such convergence
>> conditions.
>> 
>> 
>> Another concern is that Flink cannot produce periodical results in the
>> iteration over infinite data streams.
>> Take a concrete example. Given an edge stream constructing a graph, the
>> user may need the PageRank weight of each vertex in the graphs formed at
>> certain instants.
>> Currently Flink does not provide any input or iteration information to
>> users, making users hard to implement such real-time iterative
>> applications.
>> Such features are supported in both Naiad and Tornado. I think Flink should
>> support it as well.
>> 
>> What do you think?
>> 
>> Regards
>> Xiaogang
>> 
>> 
>> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<mailto:
>> fouad.alsayadi@gmail.com><ma...@gmail.com>>:
>> 
>> Hi Shi,
>> 
>> It seems that you are referring to the centralized algorithm which is no
>> longer the proposed version.
>> In the decentralized version (check last doc) there is no master node or
>> global coordination involved.
>> 
>> Let us keep this discussion to the decentralized one if possible.
>> 
>> To answer your points on the previous approach, there is a catch in your
>> trace at t7. Here is what is happening :
>> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
>> runtime (see 2.1 in the steps).
>> - RS and Heads will broadcast StatusUpdate  event and will not notify its
>> status.
>> - When StatusUpdate event gets back to the head it will notify its
>> WORKING  status.
>> 
>> Hope that answers your concern.
>> 
>> Best,
>> Fouad
>> 
>> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com<mailto:
>> shixiaogangg@gmail.com><ma...@gmail.com>>
>> wrote:
>> 
>> Hi Paris
>> 
>> I have several concerns about the correctness of the termination
>> protocol.
>> I think the termination protocol put an end to the computation even when
>> the computation has not converged.
>> 
>> Suppose there exists a loop context constructed by a OP operator, a Head
>> operator and a Tail operator (illustrated in Figure 2 in the first
>> draft).
>> The stream only contains one record. OP will pass the record to its
>> downstream operators 10 times. In other words, the loop should iterate 10
>> times.
>> 
>> If I understood the protocol correctly, the following event sequence may
>> happen in the computation:
>> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
>> system enters into Speculative Phase.
>> t2:  OP receives Record and emits it to TAIL.
>> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
>> state.
>> t4. OP receives the UpdateStatus event from HEAD, and notifies with an
>> WORKING state.
>> t5. TAIL receives Record and emits it to HEAD.
>> t6. TAIL receives the UpdateStatus event from OP, and notifies with an
>> WORKING state.
>> t7. The system starts a new attempt. HEAD receives the UpdateStatus event
>> and notifies with an IDLE state.  (Record is still in transition.)
>> t8. OP receives the UpdateStatus event from HEAD and notifies with an
>> IDLE
>> state.
>> t9. TAIL receives the UpdateStatus event from OP and notifies with an
>> IDLE
>> state.
>> t10. HEAD receives Record from TAIL and emits it to OP.
>> t11. System puts an end to the computation.
>> 
>> Though the computation is expected to iterate 10 times, it ends earlier.
>> The cause is that the communication channels of MASTER=>HEAD and
>> TAIL=>HEAD
>> are not synchronized.
>> 
>> I think the protocol follows the idea of the Chandy-Lamport algorithm to
>> determine a global state.
>> But the information of whether a node has processed any record to since
>> the
>> last request is not STABLE.
>> Hence i doubt the correctness of the protocol.
>> 
>> To determine the termination correctly, we need some information that is
>> stable.
>> In timelyflow, Naiad collects the progress made in each iteration and
>> terminates the loop when a little progress is made in an iteration
>> (identified by the timestamp vector).
>> The information is stable because the result of an iteration cannot be
>> changed by the execution of later iterations.
>> 
>> A similar method is also adopted in Tornado.
>> You may see my paper for more details about the termination of loops:
>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
>> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
>> 
>> Regards
>> Xiaogang
>> 
>> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
>> parisc@kth.se><ma...@kth.se> <mailto:
>> parisc@kth.se<ma...@kth.se>>>:
>> 
>> Hi again Flink folks,
>> 
>> Here is our new proposal that addresses Job Termination - the loop fault
>> tolerance proposal will follow shortly.
>> As Stephan hinted, we need operators to be aware of their scope level.
>> 
>> Thus, it is time we make loops great again! :)
>> 
>> Part of this FLIP basically introduces a new functional, compositional
>> API
>> for defining asynchronous loops for DataStreams.
>> This is coupled with a decentralized algorithm for job termination with
>> loops - along the lines of what Stephan described.
>> We are already working on the actual prototypes as you can observe in
>> the
>> links of the doc.
>> 
>> Please let us know if you like (or don't like) it and why, in this mail
>> discussion.
>> 
>> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
>> PfTHtq3173EhsAkpBoQ
>> 
>> cheers
>> Paris and Fouad
>> 
>> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
>> parisc@kth.se> <mailto:
>> parisc@kth.se<ma...@kth.se>><mailto:parisc@
>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/>>>
>> wrote:
>> 
>> Hey Stephan,
>> 
>> Thanks for looking into it!
>> 
>> +1 for breaking this up, will do that.
>> 
>> I can see your point and maybe it makes sense to introduce part of
>> scoping
>> to incorporate support for nested loops (otherwise it can’t work).
>> Let us think about this a bit. We will share another draft for a more
>> detail description of the approach you are suggesting asap.
>> 
>> 
>> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
>> sewen@apache.org><ma...@apache.org> <mailto:
>> sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
>>>> <mailto:sewen
>> @apache.org<http://apache.org/>>> wrote:
>> 
>> How about we break this up into two FLIPs? There are after all two
>> orthogonal problems (termination, fault tolerance) with quite different
>> discussion states.
>> 
>> Concerning fault tolerance, I like the ideas.
>> For the termination proposal, I would like to iterate a bit more.
>> 
>> *Termination algorithm:*
>> 
>> My main concern here is the introduction of a termination coordinator
>> and
>> any involvement of RPC messages when deciding termination.
>> That would be such a fundamental break with the current runtime
>> architecture, and it would make the currently very elegant and simple
>> model
>> much more complicated and harder to maintain. Given that Flink's
>> runtime is
>> complex enough, I would really like to avoid that.
>> 
>> The current runtime paradigm coordinates between operators strictly via
>> in-band events. RPC calls happen between operators and the master for
>> triggering and acknowledging execution and checkpoints.
>> 
>> I was wondering whether we can keep following that paradigm and still
>> get
>> most of what you are proposing here. In some sense, all we need to do is
>> replace RPC calls with in-band events, and "decentralize" the
>> coordinator
>> such that every operator can make its own termination decision by
>> itself.
>> 
>> This is only a rough sketch, you probably need to flesh it out more.
>> 
>> - I assume that the OP in the diagram knows that it is in a loop and
>> that
>> it is the one connected to the head and tail
>> 
>> - When OP receives and EndOfStream Event from the regular source (RS),
>> it
>> emits an "AttemptTermination" event downstream to the operators
>> involved in
>> the loop. It attaches an attempt sequence number and memorizes that
>> - Tail and Head forward these events
>> - When OP receives the event back with the same attempt sequence number,
>> and no records came in the meantime, it shuts down and emits EndOfStream
>> downstream
>> - When other records came back between emitting the AttemptTermination
>> event and receiving it back, then it emits a new AttemptTermination
>> event
>> with the next sequence number.
>> - This should terminate as soon as the loop is empty.
>> 
>> Might this model even generalize to nested loops, where the
>> "AttemptTermination" event is scoped by the loop's nesting level?
>> 
>> Let me know what you think!
>> 
>> 
>> Best,
>> Stephan
>> 
>> 
>> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org<mailto:
>> sewen@apache.org><ma...@apache.org>
>> <ma...@apache.org><mailto:
>> sewen@apache.org<ma...@apache.org>
>> <ma...@apache.org>>> wrote:
>> 
>> Hi!
>> 
>> I am still scanning it and compiling some comments. Give me a bit ;-)
>> 
>> Stephan
>> 
>> 
>> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se<mailto:
>> parisc@kth.se><ma...@kth.se> <mailto:
>> parisc@kth.se<ma...@kth.se>><mailto:
>> parisc@kth.se<ma...@kth.se> <mailto:
>> parisc@kth.se>>> wrote:
>> 
>> Hey all,
>> 
>> Now that many of you have already scanned the document (judging from the
>> views) maybe it is time to give back some feedback!
>> Did you like it? Would you suggest an improvement?
>> 
>> I would suggest not to leave this in the void. It has to do with
>> important properties that the system promises to provide.
>> Me and Fouad will do our best to answer your questions and discuss this
>> further.
>> 
>> cheers
>> Paris
>> 
>> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
>> parisc@kth.se><ma...@kth.se> <mailto:
>> parisc@kth.se<ma...@kth.se>><mailto:parisc@
>> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/
>>>> <mailto:parisc@k
>> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
>> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
>> 
>> Hello everyone,
>> 
>> Loops in Apache Flink have a good potential to become a much more
>> powerful thing in future version of Apache Flink.
>> There is generally high demand to make them usable and first of all
>> production-ready for upcoming releases.
>> 
>> As a first commitment we would like to propose FLIP-13 for consistent
>> processing with Loops.
>> We are also working on scoped loops for Q1 2017 which we can share if
>> there is enough interest.
>> 
>> For now, that is an improvement proposal that solves two pending major
>> issues:
>> 
>> 1) The (not so trivial) problem of correct termination of jobs with
>> iterations
>> 2) The applicability of the checkpointing algorithm to iterative
>> dataflow
>> graphs.
>> 
>> We would really appreciate it if you go through the linked draft
>> (motivation and proposed changes) for FLIP-13 and point out comments,
>> preferably publicly in this devlist discussion before we go ahead and
>> update the wiki.
>> 
>> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
>> BhDbtoYucmByBjRBISs/edit?usp=sharing
>> 
>> cheers
>> 
>> Paris and Fouad
>> 
>> 


Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Gyula Fóra <gy...@apache.org>.
Hi Paris,

I like the proposed changes to the iteration API, this cleans up things in
the Java API without any strict restriction I think (it was never a problem
in the Scala API).

The termination algorithm based on the proposed scoped loops seems to be
fairly simple and looks good :)

Cheers,
Gyula

Paris Carbone <pa...@kth.se> ezt írta (időpont: 2016. nov. 14., H, 8:50):

> That would be great Shi! Let's take that offline.
>
> Anyone else interested in the iteration changes? It would be nice to
> incorporate these to v1.2 if possible so I count on your review asap.
>
> cheers,
> Paris
>
> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang.sxg@alibaba-inc.com
> <ma...@alibaba-inc.com>> wrote:
>
> Hi Paris
>
> Unfortunately, the project is not public yet.
> But i can provide you a primitive implementation of the update protocol in
> the paper. It’s implemented in Storm. Since the protocol assumes the
> communication channels between different tasks are dual, i think it’s not
> easy to adapt it to Flink.
>
> Regards
> Xiaogang
>
>
> 在 2016年11月12日,上午3:03,Paris Carbone <pa...@kth.se>>
> 写道:
>
> Hi Shi,
>
> Naiad/Timely Dataflow and other projects use global coordination which is
> very convenient for asynchronous progress tracking in general but it has
> some downsides in a production systems that count on in-flight
> transactional control mechanisms and rollback recovery guarantees. This is
> why we generally prefer decentralized approaches (despite their our
> downsides).
>
> Regarding synchronous/structured iterations, this is a bit off topic and
> they are a bit of a different story as you already know.
> We maintain a graph streaming (gelly-streams) library on Flink that you
> might find interesting [1]. Vasia, another Flink committer is also working
> on that among others.
> You can keep an eye on it since we are planning to use this project as a
> showcase for a new way of doing structured and fixpoint iterations on
> streams in the future.
>
> P.S. many thanks for sharing your publication, it was an interesting read.
> Do you happen to have your source code public? We could most certainly use
> it in an benchmark soon.
>
> [1] https://github.com/vasia/gelly-streaming
>
>
> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com<mailto:
> shixiaogangg@gmail.com><ma...@gmail.com>> wrote:
>
> Hi, Fouad
>
> Thank you for the explanation. Now the centralized method seems correct to
> me.
> The passing of StatusUpdate events will lead to synchronous iterations and
> we are using the information in each iterations to terminate the
> computation.
>
> Actually, i prefer the centralized method because in many applications, the
> convergence may depend on some global statistics.
> For example, a PageRank program may terminate the computation when 99%
> vertices are converged.
> I think those learning programs which cannot reach the fixed-point
> (oscillating around the fixed-point) can benefit a lot from such features.
> The decentralized method makes it hard to support such convergence
> conditions.
>
>
> Another concern is that Flink cannot produce periodical results in the
> iteration over infinite data streams.
> Take a concrete example. Given an edge stream constructing a graph, the
> user may need the PageRank weight of each vertex in the graphs formed at
> certain instants.
> Currently Flink does not provide any input or iteration information to
> users, making users hard to implement such real-time iterative
> applications.
> Such features are supported in both Naiad and Tornado. I think Flink should
> support it as well.
>
> What do you think?
>
> Regards
> Xiaogang
>
>
> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com<mailto:
> fouad.alsayadi@gmail.com><ma...@gmail.com>>:
>
> Hi Shi,
>
> It seems that you are referring to the centralized algorithm which is no
> longer the proposed version.
> In the decentralized version (check last doc) there is no master node or
> global coordination involved.
>
> Let us keep this discussion to the decentralized one if possible.
>
> To answer your points on the previous approach, there is a catch in your
> trace at t7. Here is what is happening :
> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
> runtime (see 2.1 in the steps).
> - RS and Heads will broadcast StatusUpdate  event and will not notify its
> status.
> - When StatusUpdate event gets back to the head it will notify its
> WORKING  status.
>
> Hope that answers your concern.
>
> Best,
> Fouad
>
> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com<mailto:
> shixiaogangg@gmail.com><ma...@gmail.com>>
> wrote:
>
> Hi Paris
>
> I have several concerns about the correctness of the termination
> protocol.
> I think the termination protocol put an end to the computation even when
> the computation has not converged.
>
> Suppose there exists a loop context constructed by a OP operator, a Head
> operator and a Tail operator (illustrated in Figure 2 in the first
> draft).
> The stream only contains one record. OP will pass the record to its
> downstream operators 10 times. In other words, the loop should iterate 10
> times.
>
> If I understood the protocol correctly, the following event sequence may
> happen in the computation:
> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
> system enters into Speculative Phase.
> t2:  OP receives Record and emits it to TAIL.
> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
> state.
> t4. OP receives the UpdateStatus event from HEAD, and notifies with an
> WORKING state.
> t5. TAIL receives Record and emits it to HEAD.
> t6. TAIL receives the UpdateStatus event from OP, and notifies with an
> WORKING state.
> t7. The system starts a new attempt. HEAD receives the UpdateStatus event
> and notifies with an IDLE state.  (Record is still in transition.)
> t8. OP receives the UpdateStatus event from HEAD and notifies with an
> IDLE
> state.
> t9. TAIL receives the UpdateStatus event from OP and notifies with an
> IDLE
> state.
> t10. HEAD receives Record from TAIL and emits it to OP.
> t11. System puts an end to the computation.
>
> Though the computation is expected to iterate 10 times, it ends earlier.
> The cause is that the communication channels of MASTER=>HEAD and
> TAIL=>HEAD
> are not synchronized.
>
> I think the protocol follows the idea of the Chandy-Lamport algorithm to
> determine a global state.
> But the information of whether a node has processed any record to since
> the
> last request is not STABLE.
> Hence i doubt the correctness of the protocol.
>
> To determine the termination correctly, we need some information that is
> stable.
> In timelyflow, Naiad collects the progress made in each iteration and
> terminates the loop when a little progress is made in an iteration
> (identified by the timestamp vector).
> The information is stable because the result of an iteration cannot be
> changed by the execution of later iterations.
>
> A similar method is also adopted in Tornado.
> You may see my paper for more details about the termination of loops:
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
>
> Regards
> Xiaogang
>
> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se<mailto:
> parisc@kth.se><ma...@kth.se> <mailto:
> parisc@kth.se<ma...@kth.se>>>:
>
> Hi again Flink folks,
>
> Here is our new proposal that addresses Job Termination - the loop fault
> tolerance proposal will follow shortly.
> As Stephan hinted, we need operators to be aware of their scope level.
>
> Thus, it is time we make loops great again! :)
>
> Part of this FLIP basically introduces a new functional, compositional
> API
> for defining asynchronous loops for DataStreams.
> This is coupled with a decentralized algorithm for job termination with
> loops - along the lines of what Stephan described.
> We are already working on the actual prototypes as you can observe in
> the
> links of the doc.
>
> Please let us know if you like (or don't like) it and why, in this mail
> discussion.
>
> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
> PfTHtq3173EhsAkpBoQ
>
> cheers
> Paris and Fouad
>
> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:
> parisc@kth.se> <mailto:
> parisc@kth.se<ma...@kth.se>><mailto:parisc@
> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/>>>
> wrote:
>
> Hey Stephan,
>
> Thanks for looking into it!
>
> +1 for breaking this up, will do that.
>
> I can see your point and maybe it makes sense to introduce part of
> scoping
> to incorporate support for nested loops (otherwise it can’t work).
> Let us think about this a bit. We will share another draft for a more
> detail description of the approach you are suggesting asap.
>
>
> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:
> sewen@apache.org><ma...@apache.org> <mailto:
> sewen@apache.org<ma...@apache.org><mailto:sewen@apache.org
> >><mailto:sewen
> @apache.org<http://apache.org/>>> wrote:
>
> How about we break this up into two FLIPs? There are after all two
> orthogonal problems (termination, fault tolerance) with quite different
> discussion states.
>
> Concerning fault tolerance, I like the ideas.
> For the termination proposal, I would like to iterate a bit more.
>
> *Termination algorithm:*
>
> My main concern here is the introduction of a termination coordinator
> and
> any involvement of RPC messages when deciding termination.
> That would be such a fundamental break with the current runtime
> architecture, and it would make the currently very elegant and simple
> model
> much more complicated and harder to maintain. Given that Flink's
> runtime is
> complex enough, I would really like to avoid that.
>
> The current runtime paradigm coordinates between operators strictly via
> in-band events. RPC calls happen between operators and the master for
> triggering and acknowledging execution and checkpoints.
>
> I was wondering whether we can keep following that paradigm and still
> get
> most of what you are proposing here. In some sense, all we need to do is
> replace RPC calls with in-band events, and "decentralize" the
> coordinator
> such that every operator can make its own termination decision by
> itself.
>
> This is only a rough sketch, you probably need to flesh it out more.
>
> - I assume that the OP in the diagram knows that it is in a loop and
> that
> it is the one connected to the head and tail
>
> - When OP receives and EndOfStream Event from the regular source (RS),
> it
> emits an "AttemptTermination" event downstream to the operators
> involved in
> the loop. It attaches an attempt sequence number and memorizes that
> - Tail and Head forward these events
> - When OP receives the event back with the same attempt sequence number,
> and no records came in the meantime, it shuts down and emits EndOfStream
> downstream
> - When other records came back between emitting the AttemptTermination
> event and receiving it back, then it emits a new AttemptTermination
> event
> with the next sequence number.
> - This should terminate as soon as the loop is empty.
>
> Might this model even generalize to nested loops, where the
> "AttemptTermination" event is scoped by the loop's nesting level?
>
> Let me know what you think!
>
>
> Best,
> Stephan
>
>
> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org<mailto:
> sewen@apache.org><ma...@apache.org>
> <ma...@apache.org><mailto:
> sewen@apache.org<ma...@apache.org>
> <ma...@apache.org>>> wrote:
>
> Hi!
>
> I am still scanning it and compiling some comments. Give me a bit ;-)
>
> Stephan
>
>
> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se<mailto:
> parisc@kth.se><ma...@kth.se> <mailto:
> parisc@kth.se<ma...@kth.se>><mailto:
> parisc@kth.se<ma...@kth.se> <mailto:
> parisc@kth.se>>> wrote:
>
> Hey all,
>
> Now that many of you have already scanned the document (judging from the
> views) maybe it is time to give back some feedback!
> Did you like it? Would you suggest an improvement?
>
> I would suggest not to leave this in the void. It has to do with
> important properties that the system promises to provide.
> Me and Fouad will do our best to answer your questions and discuss this
> further.
>
> cheers
> Paris
>
> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:
> parisc@kth.se><ma...@kth.se> <mailto:
> parisc@kth.se<ma...@kth.se>><mailto:parisc@
> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/
> >><mailto:parisc@k
> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
>
> Hello everyone,
>
> Loops in Apache Flink have a good potential to become a much more
> powerful thing in future version of Apache Flink.
> There is generally high demand to make them usable and first of all
> production-ready for upcoming releases.
>
> As a first commitment we would like to propose FLIP-13 for consistent
> processing with Loops.
> We are also working on scoped loops for Q1 2017 which we can share if
> there is enough interest.
>
> For now, that is an improvement proposal that solves two pending major
> issues:
>
> 1) The (not so trivial) problem of correct termination of jobs with
> iterations
> 2) The applicability of the checkpointing algorithm to iterative
> dataflow
> graphs.
>
> We would really appreciate it if you go through the linked draft
> (motivation and proposed changes) for FLIP-13 and point out comments,
> preferably publicly in this devlist discussion before we go ahead and
> update the wiki.
>
> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
> BhDbtoYucmByBjRBISs/edit?usp=sharing
>
> cheers
>
> Paris and Fouad
>
>

Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Paris Carbone <pa...@kth.se>.
That would be great Shi! Let's take that offline.

Anyone else interested in the iteration changes? It would be nice to incorporate these to v1.2 if possible so I count on your review asap.

cheers,
Paris

On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xi...@alibaba-inc.com>> wrote:

Hi Paris

Unfortunately, the project is not public yet.
But i can provide you a primitive implementation of the update protocol in the paper. It’s implemented in Storm. Since the protocol assumes the communication channels between different tasks are dual, i think it’s not easy to adapt it to Flink.

Regards
Xiaogang


在 2016年11月12日,上午3:03,Paris Carbone <pa...@kth.se>> 写道:

Hi Shi,

Naiad/Timely Dataflow and other projects use global coordination which is very convenient for asynchronous progress tracking in general but it has some downsides in a production systems that count on in-flight transactional control mechanisms and rollback recovery guarantees. This is why we generally prefer decentralized approaches (despite their our downsides).

Regarding synchronous/structured iterations, this is a bit off topic and they are a bit of a different story as you already know.
We maintain a graph streaming (gelly-streams) library on Flink that you might find interesting [1]. Vasia, another Flink committer is also working on that among others.
You can keep an eye on it since we are planning to use this project as a showcase for a new way of doing structured and fixpoint iterations on streams in the future.

P.S. many thanks for sharing your publication, it was an interesting read. Do you happen to have your source code public? We could most certainly use it in an benchmark soon.

[1] https://github.com/vasia/gelly-streaming


On 11 Nov 2016, at 19:18, SHI Xiaogang <sh...@gmail.com>> wrote:

Hi, Fouad

Thank you for the explanation. Now the centralized method seems correct to
me.
The passing of StatusUpdate events will lead to synchronous iterations and
we are using the information in each iterations to terminate the
computation.

Actually, i prefer the centralized method because in many applications, the
convergence may depend on some global statistics.
For example, a PageRank program may terminate the computation when 99%
vertices are converged.
I think those learning programs which cannot reach the fixed-point
(oscillating around the fixed-point) can benefit a lot from such features.
The decentralized method makes it hard to support such convergence
conditions.


Another concern is that Flink cannot produce periodical results in the
iteration over infinite data streams.
Take a concrete example. Given an edge stream constructing a graph, the
user may need the PageRank weight of each vertex in the graphs formed at
certain instants.
Currently Flink does not provide any input or iteration information to
users, making users hard to implement such real-time iterative applications.
Such features are supported in both Naiad and Tornado. I think Flink should
support it as well.

What do you think?

Regards
Xiaogang


2016-11-11 19:27 GMT+08:00 Fouad ALi <fo...@gmail.com>>:

Hi Shi,

It seems that you are referring to the centralized algorithm which is no
longer the proposed version.
In the decentralized version (check last doc) there is no master node or
global coordination involved.

Let us keep this discussion to the decentralized one if possible.

To answer your points on the previous approach, there is a catch in your
trace at t7. Here is what is happening :
- Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
runtime (see 2.1 in the steps).
- RS and Heads will broadcast StatusUpdate  event and will not notify its
status.
- When StatusUpdate event gets back to the head it will notify its
WORKING  status.

Hope that answers your concern.

Best,
Fouad

On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <sh...@gmail.com>>
wrote:

Hi Paris

I have several concerns about the correctness of the termination
protocol.
I think the termination protocol put an end to the computation even when
the computation has not converged.

Suppose there exists a loop context constructed by a OP operator, a Head
operator and a Tail operator (illustrated in Figure 2 in the first
draft).
The stream only contains one record. OP will pass the record to its
downstream operators 10 times. In other words, the loop should iterate 10
times.

If I understood the protocol correctly, the following event sequence may
happen in the computation:
t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
system enters into Speculative Phase.
t2:  OP receives Record and emits it to TAIL.
t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
state.
t4. OP receives the UpdateStatus event from HEAD, and notifies with an
WORKING state.
t5. TAIL receives Record and emits it to HEAD.
t6. TAIL receives the UpdateStatus event from OP, and notifies with an
WORKING state.
t7. The system starts a new attempt. HEAD receives the UpdateStatus event
and notifies with an IDLE state.  (Record is still in transition.)
t8. OP receives the UpdateStatus event from HEAD and notifies with an
IDLE
state.
t9. TAIL receives the UpdateStatus event from OP and notifies with an
IDLE
state.
t10. HEAD receives Record from TAIL and emits it to OP.
t11. System puts an end to the computation.

Though the computation is expected to iterate 10 times, it ends earlier.
The cause is that the communication channels of MASTER=>HEAD and
TAIL=>HEAD
are not synchronized.

I think the protocol follows the idea of the Chandy-Lamport algorithm to
determine a global state.
But the information of whether a node has processed any record to since
the
last request is not STABLE.
Hence i doubt the correctness of the protocol.

To determine the termination correctly, we need some information that is
stable.
In timelyflow, Naiad collects the progress made in each iteration and
terminates the loop when a little progress is made in an iteration
(identified by the timestamp vector).
The information is stable because the result of an iteration cannot be
changed by the execution of later iterations.

A similar method is also adopted in Tornado.
You may see my paper for more details about the termination of loops:
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>

Regards
Xiaogang

2016-11-11 3:19 GMT+08:00 Paris Carbone <pa...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>>>:

Hi again Flink folks,

Here is our new proposal that addresses Job Termination - the loop fault
tolerance proposal will follow shortly.
As Stephan hinted, we need operators to be aware of their scope level.

Thus, it is time we make loops great again! :)

Part of this FLIP basically introduces a new functional, compositional
API
for defining asynchronous loops for DataStreams.
This is coupled with a decentralized algorithm for job termination with
loops - along the lines of what Stephan described.
We are already working on the actual prototypes as you can observe in
the
links of the doc.

Please let us know if you like (or don't like) it and why, in this mail
discussion.

https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
PfTHtq3173EhsAkpBoQ

cheers
Paris and Fouad

On 31 Oct 2016, at 12:53, Paris Carbone <pa...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>><mailto:parisc@
kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/>>> wrote:

Hey Stephan,

Thanks for looking into it!

+1 for breaking this up, will do that.

I can see your point and maybe it makes sense to introduce part of
scoping
to incorporate support for nested loops (otherwise it can’t work).
Let us think about this a bit. We will share another draft for a more
detail description of the approach you are suggesting asap.


On 27 Oct 2016, at 10:55, Stephan Ewen <se...@apache.org> <mailto:
sewen@apache.org<ma...@apache.org>><mailto:sewen
@apache.org<http://apache.org/>>> wrote:

How about we break this up into two FLIPs? There are after all two
orthogonal problems (termination, fault tolerance) with quite different
discussion states.

Concerning fault tolerance, I like the ideas.
For the termination proposal, I would like to iterate a bit more.

*Termination algorithm:*

My main concern here is the introduction of a termination coordinator
and
any involvement of RPC messages when deciding termination.
That would be such a fundamental break with the current runtime
architecture, and it would make the currently very elegant and simple
model
much more complicated and harder to maintain. Given that Flink's
runtime is
complex enough, I would really like to avoid that.

The current runtime paradigm coordinates between operators strictly via
in-band events. RPC calls happen between operators and the master for
triggering and acknowledging execution and checkpoints.

I was wondering whether we can keep following that paradigm and still
get
most of what you are proposing here. In some sense, all we need to do is
replace RPC calls with in-band events, and "decentralize" the
coordinator
such that every operator can make its own termination decision by
itself.

This is only a rough sketch, you probably need to flesh it out more.

- I assume that the OP in the diagram knows that it is in a loop and
that
it is the one connected to the head and tail

- When OP receives and EndOfStream Event from the regular source (RS),
it
emits an "AttemptTermination" event downstream to the operators
involved in
the loop. It attaches an attempt sequence number and memorizes that
- Tail and Head forward these events
- When OP receives the event back with the same attempt sequence number,
and no records came in the meantime, it shuts down and emits EndOfStream
downstream
- When other records came back between emitting the AttemptTermination
event and receiving it back, then it emits a new AttemptTermination
event
with the next sequence number.
- This should terminate as soon as the loop is empty.

Might this model even generalize to nested loops, where the
"AttemptTermination" event is scoped by the loop's nesting level?

Let me know what you think!


Best,
Stephan


On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <se...@apache.org>
<ma...@apache.org><mailto:
sewen@apache.org<ma...@apache.org> <ma...@apache.org>>> wrote:

Hi!

I am still scanning it and compiling some comments. Give me a bit ;-)

Stephan


On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <pa...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>><mailto:
parisc@kth.se<ma...@kth.se> <ma...@kth.se>>> wrote:

Hey all,

Now that many of you have already scanned the document (judging from the
views) maybe it is time to give back some feedback!
Did you like it? Would you suggest an improvement?

I would suggest not to leave this in the void. It has to do with
important properties that the system promises to provide.
Me and Fouad will do our best to answer your questions and discuss this
further.

cheers
Paris

On 21 Oct 2016, at 08:54, Paris Carbone <pa...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>><mailto:parisc@
kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/>><mailto:parisc@k
th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><http://th.se<http://th.se/> <http://th.se/>>>> wrote:

Hello everyone,

Loops in Apache Flink have a good potential to become a much more
powerful thing in future version of Apache Flink.
There is generally high demand to make them usable and first of all
production-ready for upcoming releases.

As a first commitment we would like to propose FLIP-13 for consistent
processing with Loops.
We are also working on scoped loops for Q1 2017 which we can share if
there is enough interest.

For now, that is an improvement proposal that solves two pending major
issues:

1) The (not so trivial) problem of correct termination of jobs with
iterations
2) The applicability of the checkpointing algorithm to iterative
dataflow
graphs.

We would really appreciate it if you go through the linked draft
(motivation and proposed changes) for FLIP-13 and point out comments,
preferably publicly in this devlist discussion before we go ahead and
update the wiki.

https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
BhDbtoYucmByBjRBISs/edit?usp=sharing

cheers

Paris and Fouad


Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by "xiaogang.sxg" <xi...@alibaba-inc.com>.
Hi Paris

Unfortunately, the project is not public yet. 
But i can provide you a primitive implementation of the update protocol in the paper. It’s implemented in Storm. Since the protocol assumes the communication channels between different tasks are dual, i think it’s not easy to adapt it to Flink. 

Regards
Xiaogang


> 在 2016年11月12日,上午3:03,Paris Carbone <pa...@kth.se> 写道:
> 
> Hi Shi,
> 
> Naiad/Timely Dataflow and other projects use global coordination which is very convenient for asynchronous progress tracking in general but it has some downsides in a production systems that count on in-flight transactional control mechanisms and rollback recovery guarantees. This is why we generally prefer decentralized approaches (despite their our downsides).
> 
> Regarding synchronous/structured iterations, this is a bit off topic and they are a bit of a different story as you already know.
> We maintain a graph streaming (gelly-streams) library on Flink that you might find interesting [1]. Vasia, another Flink committer is also working on that among others.
> You can keep an eye on it since we are planning to use this project as a showcase for a new way of doing structured and fixpoint iterations on streams in the future.
> 
> P.S. many thanks for sharing your publication, it was an interesting read. Do you happen to have your source code public? We could most certainly use it in an benchmark soon.
> 
> [1] https://github.com/vasia/gelly-streaming <https://github.com/vasia/gelly-streaming>
> 
> 
> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com <ma...@gmail.com><mailto:shixiaogangg@gmail.com <ma...@gmail.com>>> wrote:
> 
> Hi, Fouad
> 
> Thank you for the explanation. Now the centralized method seems correct to
> me.
> The passing of StatusUpdate events will lead to synchronous iterations and
> we are using the information in each iterations to terminate the
> computation.
> 
> Actually, i prefer the centralized method because in many applications, the
> convergence may depend on some global statistics.
> For example, a PageRank program may terminate the computation when 99%
> vertices are converged.
> I think those learning programs which cannot reach the fixed-point
> (oscillating around the fixed-point) can benefit a lot from such features.
> The decentralized method makes it hard to support such convergence
> conditions.
> 
> 
> Another concern is that Flink cannot produce periodical results in the
> iteration over infinite data streams.
> Take a concrete example. Given an edge stream constructing a graph, the
> user may need the PageRank weight of each vertex in the graphs formed at
> certain instants.
> Currently Flink does not provide any input or iteration information to
> users, making users hard to implement such real-time iterative applications.
> Such features are supported in both Naiad and Tornado. I think Flink should
> support it as well.
> 
> What do you think?
> 
> Regards
> Xiaogang
> 
> 
> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com <ma...@gmail.com><mailto:fouad.alsayadi@gmail.com <ma...@gmail.com>>>:
> 
> Hi Shi,
> 
> It seems that you are referring to the centralized algorithm which is no
> longer the proposed version.
> In the decentralized version (check last doc) there is no master node or
> global coordination involved.
> 
> Let us keep this discussion to the decentralized one if possible.
> 
> To answer your points on the previous approach, there is a catch in your
> trace at t7. Here is what is happening :
> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
> runtime (see 2.1 in the steps).
> - RS and Heads will broadcast StatusUpdate  event and will not notify its
> status.
> - When StatusUpdate event gets back to the head it will notify its
> WORKING  status.
> 
> Hope that answers your concern.
> 
> Best,
> Fouad
> 
> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com <ma...@gmail.com><mailto:shixiaogangg@gmail.com <ma...@gmail.com>>>
> wrote:
> 
> Hi Paris
> 
> I have several concerns about the correctness of the termination
> protocol.
> I think the termination protocol put an end to the computation even when
> the computation has not converged.
> 
> Suppose there exists a loop context constructed by a OP operator, a Head
> operator and a Tail operator (illustrated in Figure 2 in the first
> draft).
> The stream only contains one record. OP will pass the record to its
> downstream operators 10 times. In other words, the loop should iterate 10
> times.
> 
> If I understood the protocol correctly, the following event sequence may
> happen in the computation:
> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
> system enters into Speculative Phase.
> t2:  OP receives Record and emits it to TAIL.
> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
> state.
> t4. OP receives the UpdateStatus event from HEAD, and notifies with an
> WORKING state.
> t5. TAIL receives Record and emits it to HEAD.
> t6. TAIL receives the UpdateStatus event from OP, and notifies with an
> WORKING state.
> t7. The system starts a new attempt. HEAD receives the UpdateStatus event
> and notifies with an IDLE state.  (Record is still in transition.)
> t8. OP receives the UpdateStatus event from HEAD and notifies with an
> IDLE
> state.
> t9. TAIL receives the UpdateStatus event from OP and notifies with an
> IDLE
> state.
> t10. HEAD receives Record from TAIL and emits it to OP.
> t11. System puts an end to the computation.
> 
> Though the computation is expected to iterate 10 times, it ends earlier.
> The cause is that the communication channels of MASTER=>HEAD and
> TAIL=>HEAD
> are not synchronized.
> 
> I think the protocol follows the idea of the Chandy-Lamport algorithm to
> determine a global state.
> But the information of whether a node has processed any record to since
> the
> last request is not STABLE.
> Hence i doubt the correctness of the protocol.
> 
> To determine the termination correctly, we need some information that is
> stable.
> In timelyflow, Naiad collects the progress made in each iteration and
> terminates the loop when a little progress is made in an iteration
> (identified by the timestamp vector).
> The information is stable because the result of an iteration cannot be
> changed by the execution of later iterations.
> 
> A similar method is also adopted in Tornado.
> You may see my paper for more details about the termination of loops:
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf> <
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>>
> 
> Regards
> Xiaogang
> 
> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se <ma...@kth.se><mailto:parisc@kth.se <ma...@kth.se>> <mailto:
> parisc@kth.se <ma...@kth.se><mailto:parisc@kth.se <ma...@kth.se>>>>:
> 
> Hi again Flink folks,
> 
> Here is our new proposal that addresses Job Termination - the loop fault
> tolerance proposal will follow shortly.
> As Stephan hinted, we need operators to be aware of their scope level.
> 
> Thus, it is time we make loops great again! :)
> 
> Part of this FLIP basically introduces a new functional, compositional
> API
> for defining asynchronous loops for DataStreams.
> This is coupled with a decentralized algorithm for job termination with
> loops - along the lines of what Stephan described.
> We are already working on the actual prototypes as you can observe in
> the
> links of the doc.
> 
> Please let us know if you like (or don't like) it and why, in this mail
> discussion.
> 
> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y- <https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y->
> PfTHtq3173EhsAkpBoQ
> 
> cheers
> Paris and Fouad
> 
> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se <ma...@kth.se> <mailto:
> parisc@kth.se <ma...@kth.se><mailto:parisc@kth.se <ma...@kth.se>>><mailto:parisc@
> kth.se <http://kth.se/><http://kth.se <http://kth.se/>> <http://kth.se/ <http://kth.se/>>>> wrote:
> 
> Hey Stephan,
> 
> Thanks for looking into it!
> 
> +1 for breaking this up, will do that.
> 
> I can see your point and maybe it makes sense to introduce part of
> scoping
> to incorporate support for nested loops (otherwise it can’t work).
> Let us think about this a bit. We will share another draft for a more
> detail description of the approach you are suggesting asap.
> 
> 
> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org <ma...@apache.org><mailto:sewen@apache.org <ma...@apache.org>> <mailto:
> sewen@apache.org <ma...@apache.org><mailto:sewen@apache.org <ma...@apache.org>>><mailto:sewen
> @apache.org <http://apache.org/>>> wrote:
> 
> How about we break this up into two FLIPs? There are after all two
> orthogonal problems (termination, fault tolerance) with quite different
> discussion states.
> 
> Concerning fault tolerance, I like the ideas.
> For the termination proposal, I would like to iterate a bit more.
> 
> *Termination algorithm:*
> 
> My main concern here is the introduction of a termination coordinator
> and
> any involvement of RPC messages when deciding termination.
> That would be such a fundamental break with the current runtime
> architecture, and it would make the currently very elegant and simple
> model
> much more complicated and harder to maintain. Given that Flink's
> runtime is
> complex enough, I would really like to avoid that.
> 
> The current runtime paradigm coordinates between operators strictly via
> in-band events. RPC calls happen between operators and the master for
> triggering and acknowledging execution and checkpoints.
> 
> I was wondering whether we can keep following that paradigm and still
> get
> most of what you are proposing here. In some sense, all we need to do is
> replace RPC calls with in-band events, and "decentralize" the
> coordinator
> such that every operator can make its own termination decision by
> itself.
> 
> This is only a rough sketch, you probably need to flesh it out more.
> 
> - I assume that the OP in the diagram knows that it is in a loop and
> that
> it is the one connected to the head and tail
> 
> - When OP receives and EndOfStream Event from the regular source (RS),
> it
> emits an "AttemptTermination" event downstream to the operators
> involved in
> the loop. It attaches an attempt sequence number and memorizes that
> - Tail and Head forward these events
> - When OP receives the event back with the same attempt sequence number,
> and no records came in the meantime, it shuts down and emits EndOfStream
> downstream
> - When other records came back between emitting the AttemptTermination
> event and receiving it back, then it emits a new AttemptTermination
> event
> with the next sequence number.
> - This should terminate as soon as the loop is empty.
> 
> Might this model even generalize to nested loops, where the
> "AttemptTermination" event is scoped by the loop's nesting level?
> 
> Let me know what you think!
> 
> 
> Best,
> Stephan
> 
> 
> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org <ma...@apache.org><mailto:sewen@apache.org <ma...@apache.org>>
> <mailto:sewen@apache.org <ma...@apache.org>><mailto:
> sewen@apache.org <ma...@apache.org><mailto:sewen@apache.org <ma...@apache.org>> <mailto:sewen@apache.org <ma...@apache.org>>>> wrote:
> 
> Hi!
> 
> I am still scanning it and compiling some comments. Give me a bit ;-)
> 
> Stephan
> 
> 
> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se <ma...@kth.se><mailto:parisc@kth.se <ma...@kth.se>> <mailto:
> parisc@kth.se <ma...@kth.se><mailto:parisc@kth.se <ma...@kth.se>>><mailto:
> parisc@kth.se <ma...@kth.se><mailto:parisc@kth.se <ma...@kth.se>> <mailto:parisc@kth.se <ma...@kth.se>>>> wrote:
> 
> Hey all,
> 
> Now that many of you have already scanned the document (judging from the
> views) maybe it is time to give back some feedback!
> Did you like it? Would you suggest an improvement?
> 
> I would suggest not to leave this in the void. It has to do with
> important properties that the system promises to provide.
> Me and Fouad will do our best to answer your questions and discuss this
> further.
> 
> cheers
> Paris
> 
> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se <ma...@kth.se><mailto:parisc@kth.se <ma...@kth.se>> <mailto:
> parisc@kth.se <ma...@kth.se><mailto:parisc@kth.se <ma...@kth.se>>><mailto:parisc@
> kth.se <http://kth.se/><http://kth.se <http://kth.se/>> <http://kth.se/ <http://kth.se/>>><mailto:parisc@k
> th.se <http://th.se/><http://th.se <http://th.se/>> <http://th.se/ <http://th.se/>><http://th.se <http://th.se/> <http://th.se/ <http://th.se/>>>>> wrote:
> 
> Hello everyone,
> 
> Loops in Apache Flink have a good potential to become a much more
> powerful thing in future version of Apache Flink.
> There is generally high demand to make them usable and first of all
> production-ready for upcoming releases.
> 
> As a first commitment we would like to propose FLIP-13 for consistent
> processing with Loops.
> We are also working on scoped loops for Q1 2017 which we can share if
> there is enough interest.
> 
> For now, that is an improvement proposal that solves two pending major
> issues:
> 
> 1) The (not so trivial) problem of correct termination of jobs with
> iterations
> 2) The applicability of the checkpointing algorithm to iterative
> dataflow
> graphs.
> 
> We would really appreciate it if you go through the linked draft
> (motivation and proposed changes) for FLIP-13 and point out comments,
> preferably publicly in this devlist discussion before we go ahead and
> update the wiki.
> 
> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
> BhDbtoYucmByBjRBISs/edit?usp=sharing
> 
> cheers
> 
> Paris and Fouad


Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Paris Carbone <pa...@kth.se>.
Hi Shi,

Naiad/Timely Dataflow and other projects use global coordination which is very convenient for asynchronous progress tracking in general but it has some downsides in a production systems that count on in-flight transactional control mechanisms and rollback recovery guarantees. This is why we generally prefer decentralized approaches (despite their our downsides).

Regarding synchronous/structured iterations, this is a bit off topic and they are a bit of a different story as you already know.
We maintain a graph streaming (gelly-streams) library on Flink that you might find interesting [1]. Vasia, another Flink committer is also working on that among others.
You can keep an eye on it since we are planning to use this project as a showcase for a new way of doing structured and fixpoint iterations on streams in the future.

P.S. many thanks for sharing your publication, it was an interesting read. Do you happen to have your source code public? We could most certainly use it in an benchmark soon.

[1] https://github.com/vasia/gelly-streaming


On 11 Nov 2016, at 19:18, SHI Xiaogang <sh...@gmail.com>> wrote:

Hi, Fouad

Thank you for the explanation. Now the centralized method seems correct to
me.
The passing of StatusUpdate events will lead to synchronous iterations and
we are using the information in each iterations to terminate the
computation.

Actually, i prefer the centralized method because in many applications, the
convergence may depend on some global statistics.
For example, a PageRank program may terminate the computation when 99%
vertices are converged.
I think those learning programs which cannot reach the fixed-point
(oscillating around the fixed-point) can benefit a lot from such features.
The decentralized method makes it hard to support such convergence
conditions.


Another concern is that Flink cannot produce periodical results in the
iteration over infinite data streams.
Take a concrete example. Given an edge stream constructing a graph, the
user may need the PageRank weight of each vertex in the graphs formed at
certain instants.
Currently Flink does not provide any input or iteration information to
users, making users hard to implement such real-time iterative applications.
Such features are supported in both Naiad and Tornado. I think Flink should
support it as well.

What do you think?

Regards
Xiaogang


2016-11-11 19:27 GMT+08:00 Fouad ALi <fo...@gmail.com>>:

Hi Shi,

It seems that you are referring to the centralized algorithm which is no
longer the proposed version.
In the decentralized version (check last doc) there is no master node or
global coordination involved.

Let us keep this discussion to the decentralized one if possible.

To answer your points on the previous approach, there is a catch in your
trace at t7. Here is what is happening :
- Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
runtime (see 2.1 in the steps).
- RS and Heads will broadcast StatusUpdate  event and will not notify its
status.
- When StatusUpdate event gets back to the head it will notify its
WORKING  status.

Hope that answers your concern.

Best,
Fouad

On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <sh...@gmail.com>>
wrote:

Hi Paris

I have several concerns about the correctness of the termination
protocol.
I think the termination protocol put an end to the computation even when
the computation has not converged.

Suppose there exists a loop context constructed by a OP operator, a Head
operator and a Tail operator (illustrated in Figure 2 in the first
draft).
The stream only contains one record. OP will pass the record to its
downstream operators 10 times. In other words, the loop should iterate 10
times.

If I understood the protocol correctly, the following event sequence may
happen in the computation:
t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
system enters into Speculative Phase.
t2:  OP receives Record and emits it to TAIL.
t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
state.
t4. OP receives the UpdateStatus event from HEAD, and notifies with an
WORKING state.
t5. TAIL receives Record and emits it to HEAD.
t6. TAIL receives the UpdateStatus event from OP, and notifies with an
WORKING state.
t7. The system starts a new attempt. HEAD receives the UpdateStatus event
and notifies with an IDLE state.  (Record is still in transition.)
t8. OP receives the UpdateStatus event from HEAD and notifies with an
IDLE
state.
t9. TAIL receives the UpdateStatus event from OP and notifies with an
IDLE
state.
t10. HEAD receives Record from TAIL and emits it to OP.
t11. System puts an end to the computation.

Though the computation is expected to iterate 10 times, it ends earlier.
The cause is that the communication channels of MASTER=>HEAD and
TAIL=>HEAD
are not synchronized.

I think the protocol follows the idea of the Chandy-Lamport algorithm to
determine a global state.
But the information of whether a node has processed any record to since
the
last request is not STABLE.
Hence i doubt the correctness of the protocol.

To determine the termination correctly, we need some information that is
stable.
In timelyflow, Naiad collects the progress made in each iteration and
terminates the loop when a little progress is made in an iteration
(identified by the timestamp vector).
The information is stable because the result of an iteration cannot be
changed by the execution of later iterations.

A similar method is also adopted in Tornado.
You may see my paper for more details about the termination of loops:
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>

Regards
Xiaogang

2016-11-11 3:19 GMT+08:00 Paris Carbone <pa...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>>>:

Hi again Flink folks,

Here is our new proposal that addresses Job Termination - the loop fault
tolerance proposal will follow shortly.
As Stephan hinted, we need operators to be aware of their scope level.

Thus, it is time we make loops great again! :)

Part of this FLIP basically introduces a new functional, compositional
API
for defining asynchronous loops for DataStreams.
This is coupled with a decentralized algorithm for job termination with
loops - along the lines of what Stephan described.
We are already working on the actual prototypes as you can observe in
the
links of the doc.

Please let us know if you like (or don't like) it and why, in this mail
discussion.

https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
PfTHtq3173EhsAkpBoQ

cheers
Paris and Fouad

On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se <mailto:
parisc@kth.se<ma...@kth.se>><mailto:parisc@
kth.se<http://kth.se> <http://kth.se/>>> wrote:

Hey Stephan,

Thanks for looking into it!

+1 for breaking this up, will do that.

I can see your point and maybe it makes sense to introduce part of
scoping
to incorporate support for nested loops (otherwise it can’t work).
Let us think about this a bit. We will share another draft for a more
detail description of the approach you are suggesting asap.


On 27 Oct 2016, at 10:55, Stephan Ewen <se...@apache.org> <mailto:
sewen@apache.org<ma...@apache.org>><mailto:sewen
@apache.org>> wrote:

How about we break this up into two FLIPs? There are after all two
orthogonal problems (termination, fault tolerance) with quite different
discussion states.

Concerning fault tolerance, I like the ideas.
For the termination proposal, I would like to iterate a bit more.

*Termination algorithm:*

My main concern here is the introduction of a termination coordinator
and
any involvement of RPC messages when deciding termination.
That would be such a fundamental break with the current runtime
architecture, and it would make the currently very elegant and simple
model
much more complicated and harder to maintain. Given that Flink's
runtime is
complex enough, I would really like to avoid that.

The current runtime paradigm coordinates between operators strictly via
in-band events. RPC calls happen between operators and the master for
triggering and acknowledging execution and checkpoints.

I was wondering whether we can keep following that paradigm and still
get
most of what you are proposing here. In some sense, all we need to do is
replace RPC calls with in-band events, and "decentralize" the
coordinator
such that every operator can make its own termination decision by
itself.

This is only a rough sketch, you probably need to flesh it out more.

- I assume that the OP in the diagram knows that it is in a loop and
that
it is the one connected to the head and tail

- When OP receives and EndOfStream Event from the regular source (RS),
it
emits an "AttemptTermination" event downstream to the operators
involved in
the loop. It attaches an attempt sequence number and memorizes that
- Tail and Head forward these events
- When OP receives the event back with the same attempt sequence number,
and no records came in the meantime, it shuts down and emits EndOfStream
downstream
- When other records came back between emitting the AttemptTermination
event and receiving it back, then it emits a new AttemptTermination
event
with the next sequence number.
- This should terminate as soon as the loop is empty.

Might this model even generalize to nested loops, where the
"AttemptTermination" event is scoped by the loop's nesting level?

Let me know what you think!


Best,
Stephan


On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <se...@apache.org>
<ma...@apache.org><mailto:
sewen@apache.org<ma...@apache.org> <ma...@apache.org>>> wrote:

Hi!

I am still scanning it and compiling some comments. Give me a bit ;-)

Stephan


On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <pa...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>><mailto:
parisc@kth.se<ma...@kth.se> <ma...@kth.se>>> wrote:

Hey all,

Now that many of you have already scanned the document (judging from the
views) maybe it is time to give back some feedback!
Did you like it? Would you suggest an improvement?

I would suggest not to leave this in the void. It has to do with
important properties that the system promises to provide.
Me and Fouad will do our best to answer your questions and discuss this
further.

cheers
Paris

On 21 Oct 2016, at 08:54, Paris Carbone <pa...@kth.se> <mailto:
parisc@kth.se<ma...@kth.se>><mailto:parisc@
kth.se<http://kth.se> <http://kth.se/>><mailto:parisc@k
th.se<http://th.se> <http://th.se/><http://th.se <http://th.se/>>>> wrote:

Hello everyone,

Loops in Apache Flink have a good potential to become a much more
powerful thing in future version of Apache Flink.
There is generally high demand to make them usable and first of all
production-ready for upcoming releases.

As a first commitment we would like to propose FLIP-13 for consistent
processing with Loops.
We are also working on scoped loops for Q1 2017 which we can share if
there is enough interest.

For now, that is an improvement proposal that solves two pending major
issues:

1) The (not so trivial) problem of correct termination of jobs with
iterations
2) The applicability of the checkpointing algorithm to iterative
dataflow
graphs.

We would really appreciate it if you go through the linked draft
(motivation and proposed changes) for FLIP-13 and point out comments,
preferably publicly in this devlist discussion before we go ahead and
update the wiki.

https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
BhDbtoYucmByBjRBISs/edit?usp=sharing

cheers

Paris and Fouad




Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by SHI Xiaogang <sh...@gmail.com>.
Hi, Fouad

Thank you for the explanation. Now the centralized method seems correct to
me.
The passing of StatusUpdate events will lead to synchronous iterations and
we are using the information in each iterations to terminate the
computation.

Actually, i prefer the centralized method because in many applications, the
convergence may depend on some global statistics.
For example, a PageRank program may terminate the computation when 99%
vertices are converged.
I think those learning programs which cannot reach the fixed-point
(oscillating around the fixed-point) can benefit a lot from such features.
The decentralized method makes it hard to support such convergence
conditions.


Another concern is that Flink cannot produce periodical results in the
iteration over infinite data streams.
Take a concrete example. Given an edge stream constructing a graph, the
user may need the PageRank weight of each vertex in the graphs formed at
certain instants.
Currently Flink does not provide any input or iteration information to
users, making users hard to implement such real-time iterative applications.
Such features are supported in both Naiad and Tornado. I think Flink should
support it as well.

What do you think?

Regards
Xiaogang


2016-11-11 19:27 GMT+08:00 Fouad ALi <fo...@gmail.com>:

> Hi Shi,
>
> It seems that you are referring to the centralized algorithm which is no
> longer the proposed version.
> In the decentralized version (check last doc) there is no master node or
> global coordination involved.
>
> Let us keep this discussion to the decentralized one if possible.
>
> To answer your points on the previous approach, there is a catch in your
> trace at t7. Here is what is happening :
>  - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
> runtime (see 2.1 in the steps).
>  - RS and Heads will broadcast StatusUpdate  event and will not notify its
> status.
>  - When StatusUpdate event gets back to the head it will notify its
> WORKING  status.
>
> Hope that answers your concern.
>
> Best,
> Fouad
>
> > On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <sh...@gmail.com>
> wrote:
> >
> > Hi Paris
> >
> > I have several concerns about the correctness of the termination
> protocol.
> > I think the termination protocol put an end to the computation even when
> > the computation has not converged.
> >
> > Suppose there exists a loop context constructed by a OP operator, a Head
> > operator and a Tail operator (illustrated in Figure 2 in the first
> draft).
> > The stream only contains one record. OP will pass the record to its
> > downstream operators 10 times. In other words, the loop should iterate 10
> > times.
> >
> > If I understood the protocol correctly, the following event sequence may
> > happen in the computation:
> > t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
> > system enters into Speculative Phase.
> > t2:  OP receives Record and emits it to TAIL.
> > t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
> state.
> > t4. OP receives the UpdateStatus event from HEAD, and notifies with an
> > WORKING state.
> > t5. TAIL receives Record and emits it to HEAD.
> > t6. TAIL receives the UpdateStatus event from OP, and notifies with an
> > WORKING state.
> > t7. The system starts a new attempt. HEAD receives the UpdateStatus event
> > and notifies with an IDLE state.  (Record is still in transition.)
> > t8. OP receives the UpdateStatus event from HEAD and notifies with an
> IDLE
> > state.
> > t9. TAIL receives the UpdateStatus event from OP and notifies with an
> IDLE
> > state.
> > t10. HEAD receives Record from TAIL and emits it to OP.
> > t11. System puts an end to the computation.
> >
> > Though the computation is expected to iterate 10 times, it ends earlier.
> > The cause is that the communication channels of MASTER=>HEAD and
> TAIL=>HEAD
> > are not synchronized.
> >
> > I think the protocol follows the idea of the Chandy-Lamport algorithm to
> > determine a global state.
> > But the information of whether a node has processed any record to since
> the
> > last request is not STABLE.
> > Hence i doubt the correctness of the protocol.
> >
> > To determine the termination correctly, we need some information that is
> > stable.
> > In timelyflow, Naiad collects the progress made in each iteration and
> > terminates the loop when a little progress is made in an iteration
> > (identified by the timestamp vector).
> > The information is stable because the result of an iteration cannot be
> > changed by the execution of later iterations.
> >
> > A similar method is also adopted in Tornado.
> > You may see my paper for more details about the termination of loops:
> > http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
> >
> > Regards
> > Xiaogang
> >
> > 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se <mailto:
> parisc@kth.se>>:
> >
> >> Hi again Flink folks,
> >>
> >> Here is our new proposal that addresses Job Termination - the loop fault
> >> tolerance proposal will follow shortly.
> >> As Stephan hinted, we need operators to be aware of their scope level.
> >>
> >> Thus, it is time we make loops great again! :)
> >>
> >> Part of this FLIP basically introduces a new functional, compositional
> API
> >> for defining asynchronous loops for DataStreams.
> >> This is coupled with a decentralized algorithm for job termination with
> >> loops - along the lines of what Stephan described.
> >> We are already working on the actual prototypes as you can observe in
> the
> >> links of the doc.
> >>
> >> Please let us know if you like (or don't like) it and why, in this mail
> >> discussion.
> >>
> >> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
> >> PfTHtq3173EhsAkpBoQ
> >>
> >> cheers
> >> Paris and Fouad
> >>
> >> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se <mailto:
> parisc@kth.se><mailto:parisc@
> >> kth.se <http://kth.se/>>> wrote:
> >>
> >> Hey Stephan,
> >>
> >> Thanks for looking into it!
> >>
> >> +1 for breaking this up, will do that.
> >>
> >> I can see your point and maybe it makes sense to introduce part of
> scoping
> >> to incorporate support for nested loops (otherwise it can’t work).
> >> Let us think about this a bit. We will share another draft for a more
> >> detail description of the approach you are suggesting asap.
> >>
> >>
> >> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org <mailto:
> sewen@apache.org><mailto:sewen
> >> @apache.org>> wrote:
> >>
> >> How about we break this up into two FLIPs? There are after all two
> >> orthogonal problems (termination, fault tolerance) with quite different
> >> discussion states.
> >>
> >> Concerning fault tolerance, I like the ideas.
> >> For the termination proposal, I would like to iterate a bit more.
> >>
> >> *Termination algorithm:*
> >>
> >> My main concern here is the introduction of a termination coordinator
> and
> >> any involvement of RPC messages when deciding termination.
> >> That would be such a fundamental break with the current runtime
> >> architecture, and it would make the currently very elegant and simple
> model
> >> much more complicated and harder to maintain. Given that Flink's
> runtime is
> >> complex enough, I would really like to avoid that.
> >>
> >> The current runtime paradigm coordinates between operators strictly via
> >> in-band events. RPC calls happen between operators and the master for
> >> triggering and acknowledging execution and checkpoints.
> >>
> >> I was wondering whether we can keep following that paradigm and still
> get
> >> most of what you are proposing here. In some sense, all we need to do is
> >> replace RPC calls with in-band events, and "decentralize" the
> coordinator
> >> such that every operator can make its own termination decision by
> itself.
> >>
> >> This is only a rough sketch, you probably need to flesh it out more.
> >>
> >> - I assume that the OP in the diagram knows that it is in a loop and
> that
> >> it is the one connected to the head and tail
> >>
> >> - When OP receives and EndOfStream Event from the regular source (RS),
> it
> >> emits an "AttemptTermination" event downstream to the operators
> involved in
> >> the loop. It attaches an attempt sequence number and memorizes that
> >> - Tail and Head forward these events
> >> - When OP receives the event back with the same attempt sequence number,
> >> and no records came in the meantime, it shuts down and emits EndOfStream
> >> downstream
> >> - When other records came back between emitting the AttemptTermination
> >> event and receiving it back, then it emits a new AttemptTermination
> event
> >> with the next sequence number.
> >> - This should terminate as soon as the loop is empty.
> >>
> >> Might this model even generalize to nested loops, where the
> >> "AttemptTermination" event is scoped by the loop's nesting level?
> >>
> >> Let me know what you think!
> >>
> >>
> >> Best,
> >> Stephan
> >>
> >>
> >> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org
> <ma...@apache.org><mailto:
> >> sewen@apache.org <ma...@apache.org>>> wrote:
> >>
> >> Hi!
> >>
> >> I am still scanning it and compiling some comments. Give me a bit ;-)
> >>
> >> Stephan
> >>
> >>
> >> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se <mailto:
> parisc@kth.se><mailto:
> >> parisc@kth.se <ma...@kth.se>>> wrote:
> >>
> >> Hey all,
> >>
> >> Now that many of you have already scanned the document (judging from the
> >> views) maybe it is time to give back some feedback!
> >> Did you like it? Would you suggest an improvement?
> >>
> >> I would suggest not to leave this in the void. It has to do with
> >> important properties that the system promises to provide.
> >> Me and Fouad will do our best to answer your questions and discuss this
> >> further.
> >>
> >> cheers
> >> Paris
> >>
> >> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se <mailto:
> parisc@kth.se><mailto:parisc@
> >> kth.se <http://kth.se/>><mailto:parisc@k
> >> th.se <http://th.se/><http://th.se <http://th.se/>>>> wrote:
> >>
> >> Hello everyone,
> >>
> >> Loops in Apache Flink have a good potential to become a much more
> >> powerful thing in future version of Apache Flink.
> >> There is generally high demand to make them usable and first of all
> >> production-ready for upcoming releases.
> >>
> >> As a first commitment we would like to propose FLIP-13 for consistent
> >> processing with Loops.
> >> We are also working on scoped loops for Q1 2017 which we can share if
> >> there is enough interest.
> >>
> >> For now, that is an improvement proposal that solves two pending major
> >> issues:
> >>
> >> 1) The (not so trivial) problem of correct termination of jobs with
> >> iterations
> >> 2) The applicability of the checkpointing algorithm to iterative
> dataflow
> >> graphs.
> >>
> >> We would really appreciate it if you go through the linked draft
> >> (motivation and proposed changes) for FLIP-13 and point out comments,
> >> preferably publicly in this devlist discussion before we go ahead and
> >> update the wiki.
> >>
> >> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
> >> BhDbtoYucmByBjRBISs/edit?usp=sharing
> >>
> >> cheers
> >>
> >> Paris and Fouad
>
>

Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by Fouad ALi <fo...@gmail.com>.
Hi Shi,

It seems that you are referring to the centralized algorithm which is no longer the proposed version.
In the decentralized version (check last doc) there is no master node or global coordination involved.

Let us keep this discussion to the decentralized one if possible.

To answer your points on the previous approach, there is a catch in your trace at t7. Here is what is happening :
 - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from runtime (see 2.1 in the steps). 
 - RS and Heads will broadcast StatusUpdate  event and will not notify its status. 
 - When StatusUpdate event gets back to the head it will notify its WORKING  status.

Hope that answers your concern.

Best,
Fouad

> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <sh...@gmail.com> wrote:
> 
> Hi Paris
> 
> I have several concerns about the correctness of the termination protocol.
> I think the termination protocol put an end to the computation even when
> the computation has not converged.
> 
> Suppose there exists a loop context constructed by a OP operator, a Head
> operator and a Tail operator (illustrated in Figure 2 in the first draft).
> The stream only contains one record. OP will pass the record to its
> downstream operators 10 times. In other words, the loop should iterate 10
> times.
> 
> If I understood the protocol correctly, the following event sequence may
> happen in the computation:
> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
> system enters into Speculative Phase.
> t2:  OP receives Record and emits it to TAIL.
> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE state.
> t4. OP receives the UpdateStatus event from HEAD, and notifies with an
> WORKING state.
> t5. TAIL receives Record and emits it to HEAD.
> t6. TAIL receives the UpdateStatus event from OP, and notifies with an
> WORKING state.
> t7. The system starts a new attempt. HEAD receives the UpdateStatus event
> and notifies with an IDLE state.  (Record is still in transition.)
> t8. OP receives the UpdateStatus event from HEAD and notifies with an IDLE
> state.
> t9. TAIL receives the UpdateStatus event from OP and notifies with an IDLE
> state.
> t10. HEAD receives Record from TAIL and emits it to OP.
> t11. System puts an end to the computation.
> 
> Though the computation is expected to iterate 10 times, it ends earlier.
> The cause is that the communication channels of MASTER=>HEAD and TAIL=>HEAD
> are not synchronized.
> 
> I think the protocol follows the idea of the Chandy-Lamport algorithm to
> determine a global state.
> But the information of whether a node has processed any record to since the
> last request is not STABLE.
> Hence i doubt the correctness of the protocol.
> 
> To determine the termination correctly, we need some information that is
> stable.
> In timelyflow, Naiad collects the progress made in each iteration and
> terminates the loop when a little progress is made in an iteration
> (identified by the timestamp vector).
> The information is stable because the result of an iteration cannot be
> changed by the execution of later iterations.
> 
> A similar method is also adopted in Tornado.
> You may see my paper for more details about the termination of loops:
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
> 
> Regards
> Xiaogang
> 
> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se <ma...@kth.se>>:
> 
>> Hi again Flink folks,
>> 
>> Here is our new proposal that addresses Job Termination - the loop fault
>> tolerance proposal will follow shortly.
>> As Stephan hinted, we need operators to be aware of their scope level.
>> 
>> Thus, it is time we make loops great again! :)
>> 
>> Part of this FLIP basically introduces a new functional, compositional API
>> for defining asynchronous loops for DataStreams.
>> This is coupled with a decentralized algorithm for job termination with
>> loops - along the lines of what Stephan described.
>> We are already working on the actual prototypes as you can observe in the
>> links of the doc.
>> 
>> Please let us know if you like (or don't like) it and why, in this mail
>> discussion.
>> 
>> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
>> PfTHtq3173EhsAkpBoQ
>> 
>> cheers
>> Paris and Fouad
>> 
>> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se <ma...@kth.se><mailto:parisc@
>> kth.se <http://kth.se/>>> wrote:
>> 
>> Hey Stephan,
>> 
>> Thanks for looking into it!
>> 
>> +1 for breaking this up, will do that.
>> 
>> I can see your point and maybe it makes sense to introduce part of scoping
>> to incorporate support for nested loops (otherwise it can’t work).
>> Let us think about this a bit. We will share another draft for a more
>> detail description of the approach you are suggesting asap.
>> 
>> 
>> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org <ma...@apache.org><mailto:sewen
>> @apache.org>> wrote:
>> 
>> How about we break this up into two FLIPs? There are after all two
>> orthogonal problems (termination, fault tolerance) with quite different
>> discussion states.
>> 
>> Concerning fault tolerance, I like the ideas.
>> For the termination proposal, I would like to iterate a bit more.
>> 
>> *Termination algorithm:*
>> 
>> My main concern here is the introduction of a termination coordinator and
>> any involvement of RPC messages when deciding termination.
>> That would be such a fundamental break with the current runtime
>> architecture, and it would make the currently very elegant and simple model
>> much more complicated and harder to maintain. Given that Flink's runtime is
>> complex enough, I would really like to avoid that.
>> 
>> The current runtime paradigm coordinates between operators strictly via
>> in-band events. RPC calls happen between operators and the master for
>> triggering and acknowledging execution and checkpoints.
>> 
>> I was wondering whether we can keep following that paradigm and still get
>> most of what you are proposing here. In some sense, all we need to do is
>> replace RPC calls with in-band events, and "decentralize" the coordinator
>> such that every operator can make its own termination decision by itself.
>> 
>> This is only a rough sketch, you probably need to flesh it out more.
>> 
>> - I assume that the OP in the diagram knows that it is in a loop and that
>> it is the one connected to the head and tail
>> 
>> - When OP receives and EndOfStream Event from the regular source (RS), it
>> emits an "AttemptTermination" event downstream to the operators involved in
>> the loop. It attaches an attempt sequence number and memorizes that
>> - Tail and Head forward these events
>> - When OP receives the event back with the same attempt sequence number,
>> and no records came in the meantime, it shuts down and emits EndOfStream
>> downstream
>> - When other records came back between emitting the AttemptTermination
>> event and receiving it back, then it emits a new AttemptTermination event
>> with the next sequence number.
>> - This should terminate as soon as the loop is empty.
>> 
>> Might this model even generalize to nested loops, where the
>> "AttemptTermination" event is scoped by the loop's nesting level?
>> 
>> Let me know what you think!
>> 
>> 
>> Best,
>> Stephan
>> 
>> 
>> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org <ma...@apache.org><mailto:
>> sewen@apache.org <ma...@apache.org>>> wrote:
>> 
>> Hi!
>> 
>> I am still scanning it and compiling some comments. Give me a bit ;-)
>> 
>> Stephan
>> 
>> 
>> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se <ma...@kth.se><mailto:
>> parisc@kth.se <ma...@kth.se>>> wrote:
>> 
>> Hey all,
>> 
>> Now that many of you have already scanned the document (judging from the
>> views) maybe it is time to give back some feedback!
>> Did you like it? Would you suggest an improvement?
>> 
>> I would suggest not to leave this in the void. It has to do with
>> important properties that the system promises to provide.
>> Me and Fouad will do our best to answer your questions and discuss this
>> further.
>> 
>> cheers
>> Paris
>> 
>> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se <ma...@kth.se><mailto:parisc@
>> kth.se <http://kth.se/>><mailto:parisc@k
>> th.se <http://th.se/><http://th.se <http://th.se/>>>> wrote:
>> 
>> Hello everyone,
>> 
>> Loops in Apache Flink have a good potential to become a much more
>> powerful thing in future version of Apache Flink.
>> There is generally high demand to make them usable and first of all
>> production-ready for upcoming releases.
>> 
>> As a first commitment we would like to propose FLIP-13 for consistent
>> processing with Loops.
>> We are also working on scoped loops for Q1 2017 which we can share if
>> there is enough interest.
>> 
>> For now, that is an improvement proposal that solves two pending major
>> issues:
>> 
>> 1) The (not so trivial) problem of correct termination of jobs with
>> iterations
>> 2) The applicability of the checkpointing algorithm to iterative dataflow
>> graphs.
>> 
>> We would really appreciate it if you go through the linked draft
>> (motivation and proposed changes) for FLIP-13 and point out comments,
>> preferably publicly in this devlist discussion before we go ahead and
>> update the wiki.
>> 
>> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
>> BhDbtoYucmByBjRBISs/edit?usp=sharing
>> 
>> cheers
>> 
>> Paris and Fouad


Re: [DISCUSS] FLIP-14: Loops API and Termination

Posted by SHI Xiaogang <sh...@gmail.com>.
Hi Paris

I have several concerns about the correctness of the termination protocol.
I think the termination protocol put an end to the computation even when
the computation has not converged.

Suppose there exists a loop context constructed by a OP operator, a Head
operator and a Tail operator (illustrated in Figure 2 in the first draft).
The stream only contains one record. OP will pass the record to its
downstream operators 10 times. In other words, the loop should iterate 10
times.

If I understood the protocol correctly, the following event sequence may
happen in the computation:
t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
system enters into Speculative Phase.
t2:  OP receives Record and emits it to TAIL.
t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE state.
t4. OP receives the UpdateStatus event from HEAD, and notifies with an
WORKING state.
t5. TAIL receives Record and emits it to HEAD.
t6. TAIL receives the UpdateStatus event from OP, and notifies with an
WORKING state.
t7. The system starts a new attempt. HEAD receives the UpdateStatus event
and notifies with an IDLE state.  (Record is still in transition.)
t8. OP receives the UpdateStatus event from HEAD and notifies with an IDLE
state.
t9. TAIL receives the UpdateStatus event from OP and notifies with an IDLE
state.
t10. HEAD receives Record from TAIL and emits it to OP.
t11. System puts an end to the computation.

Though the computation is expected to iterate 10 times, it ends earlier.
The cause is that the communication channels of MASTER=>HEAD and TAIL=>HEAD
are not synchronized.

I think the protocol follows the idea of the Chandy-Lamport algorithm to
determine a global state.
But the information of whether a node has processed any record to since the
last request is not STABLE.
Hence i doubt the correctness of the protocol.

To determine the termination correctly, we need some information that is
stable.
In timelyflow, Naiad collects the progress made in each iteration and
terminates the loop when a little progress is made in an iteration
(identified by the timestamp vector).
The information is stable because the result of an iteration cannot be
changed by the execution of later iterations.

A similar method is also adopted in Tornado.
You may see my paper for more details about the termination of loops:
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf

Regards
Xiaogang

2016-11-11 3:19 GMT+08:00 Paris Carbone <pa...@kth.se>:

> Hi again Flink folks,
>
> Here is our new proposal that addresses Job Termination - the loop fault
> tolerance proposal will follow shortly.
> As Stephan hinted, we need operators to be aware of their scope level.
>
> Thus, it is time we make loops great again! :)
>
> Part of this FLIP basically introduces a new functional, compositional API
> for defining asynchronous loops for DataStreams.
> This is coupled with a decentralized algorithm for job termination with
> loops - along the lines of what Stephan described.
> We are already working on the actual prototypes as you can observe in the
> links of the doc.
>
> Please let us know if you like (or don't like) it and why, in this mail
> discussion.
>
> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
> PfTHtq3173EhsAkpBoQ
>
> cheers
> Paris and Fouad
>
> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se<mailto:parisc@
> kth.se>> wrote:
>
> Hey Stephan,
>
> Thanks for looking into it!
>
> +1 for breaking this up, will do that.
>
> I can see your point and maybe it makes sense to introduce part of scoping
> to incorporate support for nested loops (otherwise it can’t work).
> Let us think about this a bit. We will share another draft for a more
> detail description of the approach you are suggesting asap.
>
>
> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org<mailto:sewen
> @apache.org>> wrote:
>
> How about we break this up into two FLIPs? There are after all two
> orthogonal problems (termination, fault tolerance) with quite different
> discussion states.
>
> Concerning fault tolerance, I like the ideas.
> For the termination proposal, I would like to iterate a bit more.
>
> *Termination algorithm:*
>
> My main concern here is the introduction of a termination coordinator and
> any involvement of RPC messages when deciding termination.
> That would be such a fundamental break with the current runtime
> architecture, and it would make the currently very elegant and simple model
> much more complicated and harder to maintain. Given that Flink's runtime is
> complex enough, I would really like to avoid that.
>
> The current runtime paradigm coordinates between operators strictly via
> in-band events. RPC calls happen between operators and the master for
> triggering and acknowledging execution and checkpoints.
>
> I was wondering whether we can keep following that paradigm and still get
> most of what you are proposing here. In some sense, all we need to do is
> replace RPC calls with in-band events, and "decentralize" the coordinator
> such that every operator can make its own termination decision by itself.
>
> This is only a rough sketch, you probably need to flesh it out more.
>
> - I assume that the OP in the diagram knows that it is in a loop and that
> it is the one connected to the head and tail
>
> - When OP receives and EndOfStream Event from the regular source (RS), it
> emits an "AttemptTermination" event downstream to the operators involved in
> the loop. It attaches an attempt sequence number and memorizes that
> - Tail and Head forward these events
> - When OP receives the event back with the same attempt sequence number,
> and no records came in the meantime, it shuts down and emits EndOfStream
> downstream
> - When other records came back between emitting the AttemptTermination
> event and receiving it back, then it emits a new AttemptTermination event
> with the next sequence number.
> - This should terminate as soon as the loop is empty.
>
> Might this model even generalize to nested loops, where the
> "AttemptTermination" event is scoped by the loop's nesting level?
>
> Let me know what you think!
>
>
> Best,
> Stephan
>
>
> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org<mailto:
> sewen@apache.org>> wrote:
>
> Hi!
>
> I am still scanning it and compiling some comments. Give me a bit ;-)
>
> Stephan
>
>
> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se<mailto:
> parisc@kth.se>> wrote:
>
> Hey all,
>
> Now that many of you have already scanned the document (judging from the
> views) maybe it is time to give back some feedback!
> Did you like it? Would you suggest an improvement?
>
> I would suggest not to leave this in the void. It has to do with
> important properties that the system promises to provide.
> Me and Fouad will do our best to answer your questions and discuss this
> further.
>
> cheers
> Paris
>
> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se<mailto:parisc@
> kth.se><mailto:parisc@k
> th.se<http://th.se>>> wrote:
>
> Hello everyone,
>
> Loops in Apache Flink have a good potential to become a much more
> powerful thing in future version of Apache Flink.
> There is generally high demand to make them usable and first of all
> production-ready for upcoming releases.
>
> As a first commitment we would like to propose FLIP-13 for consistent
> processing with Loops.
> We are also working on scoped loops for Q1 2017 which we can share if
> there is enough interest.
>
> For now, that is an improvement proposal that solves two pending major
> issues:
>
> 1) The (not so trivial) problem of correct termination of jobs with
> iterations
> 2) The applicability of the checkpointing algorithm to iterative dataflow
> graphs.
>
> We would really appreciate it if you go through the linked draft
> (motivation and proposed changes) for FLIP-13 and point out comments,
> preferably publicly in this devlist discussion before we go ahead and
> update the wiki.
>
> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
> BhDbtoYucmByBjRBISs/edit?usp=sharing
>
> cheers
>
> Paris and Fouad
>
>
>
>
>
>
>