You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Elias Levy (JIRA)" <ji...@apache.org> on 2016/09/01 16:47:20 UTC

[jira] [Commented] (FLINK-4558) Add support for synchronizing streams

    [ https://issues.apache.org/jira/browse/FLINK-4558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455966#comment-15455966 ] 

Elias Levy commented on FLINK-4558:
-----------------------------------

It should be noted that Flink already perform a similar function in the handling of snapshot barriers, where it will pause a stream into an operator with multiple streams upon receiving a barrier, so as two align the two streams at the barrier to generate a snapshot.

> Add support for synchronizing streams
> -------------------------------------
>
>                 Key: FLINK-4558
>                 URL: https://issues.apache.org/jira/browse/FLINK-4558
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: Elias Levy
>
> As mentioned on the [mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html], there are use cases that require synchronizing two streams on via their times and where it is not practical to buffer all messages from one streams while waiting for the other to synchronize.  Flink should add functionality to enable such use cases.
> This could be implemented by modifying TwoInputStreamOperator so that calls to processElement1 and processElement2 could return a value indicating that the element can't yet be processed, having the framework then pause processing for some time, potentially using exponential back off with a hard maximum, and then allowing the back pressure system to do its work and pause the stream.
> Alternatively, an API could be added to explicitly pause/unpause a stream.
> For ease of use either of these mechanism should be used to create a SynchronizedTwoInputStreamOperator that end users can utilize by passing a configurable time delta to use as a synchronization threshold.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)