You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/02/18 18:35:18 UTC

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

    [ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15152692#comment-15152692 ] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

GitHub user senorcarbone opened a pull request:

    https://github.com/apache/flink/pull/1668

    [FLINK-3257] Add Exactly-Once Processing Guarantees for Iterative DataStream Jobs

    # **[WIP]**
    This is a first version of the adapted snapshot algorithm to support iterations. It is correct and works in practice...well, when memory capacity is enough for its logging requirements but I am working on that, hopefully with a little help from you. Before we go into the implementation details let me describe briefly the new algorithm.
    
    ## Algorithm
    
    Our existing checkpoint algorithm has a very fluid and straightforward protocol. It just makes sure that all checkpoint barriers are aligned in each operator so that all records before barriers (pre-shot) are processed before taking a snapshot. Since waiting indefinitely for all records in-transit within a cycle of an execution graph can violate termination (crucial liveness property) we have to...save any unprocessed records for later during the snapshot. In this take of the algorithm on Flink we assign that role to the `Iteration Head`. The steps this version varies from the vanilla algorithm are simply the following:
    
    1. An `Iteration Head` receives a barrier from the system runtime (as before) and:
    
    -  Goes into **Logging Mode**. That means that from that point on every record it receives from its `Iteration Sink` is buffered in its own operator state and **not** forwarded further until it goes back to normal mode.
    - **Forwards** the barrier to its downstream nodes (this guarantees liveness, otherwise we have a deadlock).
    
    2. Eventually, the `Iteration Head` receives
     
    
    ## Example
    
    blabla
    
    ![ftloops-topology](https://cloud.githubusercontent.com/assets/858078/13151679/7f150538-d66b-11e5-98c8-7bbe2243b810.png)
    
    
    blabla
    
    ![diagram](https://cloud.githubusercontent.com/assets/858078/13151664/7361a638-d66b-11e5-94e9-64f70a8130d7.png)
    
    ## Current Implementation Details
    
    ## Open/Pending Issues
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/senorcarbone/flink ftloops

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1668.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1668
    
----
commit 38256e4c4bb00183794699027e8e4298787c66fa
Author: Paris Carbone <pa...@kth.se>
Date:   2016-01-19T10:18:54Z

    exactly-once processing test for stream iterations

commit dbf2625536289dc70724ef798fd02989e586d874
Author: Paris Carbone <pa...@kth.se>
Date:   2016-02-18T15:49:19Z

    [wip] adapt snapshot mechanism for iterative jobs

----


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution graph. An alternative scheme can potentially include records in-transit through the back-edges of a cyclic execution graph (ABS [1]) to achieve the same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start block output and start upstream backup of all records forwarded from the respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource should finalize the snapshot, unblock its output and emit all records in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)