You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martin Eden <ma...@gmail.com> on 2017/09/29 12:01:37 UTC

EASY Friday afternoon question: order of chained sink operator execution in a streaming task

Hi all,

Just a quick one.

I have a task that looks like this (as printed in the logs):

17-09-29 0703510695 INFO TaskManager.info: Received task Co-Flat Map ->
Process -> (Sink: sink1, Sink: sink2, Sink: sink3) (2/2)

After looking a bit at the code of the streaming task I suppose the sink
operators are chained for each subtask and synchronously executed one after
the other in the order I specified them in code (which does correspond to
the order in the log message).

A particular subtask does something like this on one thread (just focusing
on sinks):

time record     invocations
0      record1   sink1.invoke(record1)
1                     sink2.invoke(record1)
2                     sink3.invoke(record1)
3      record2   sink1.invoke(record2)
4                     sink2.invoke(record2)
5                     sink3.invoke(record2)
.
.
.

Is that correct?

Thanks

Re: EASY Friday afternoon question: order of chained sink operator execution in a streaming task

Posted by Chesnay Schepler <ch...@apache.org>.
Yes, i believe that is correct.

On 29.09.2017 14:01, Martin Eden wrote:
> Hi all,
>
> Just a quick one.
>
> I have a task that looks like this (as printed in the logs):
>
> 17-09-29 0703510695 INFO TaskManager.info: Received task Co-Flat Map 
> -> Process -> (Sink: sink1, Sink: sink2, Sink: sink3) (2/2)
>
> After looking a bit at the code of the streaming task I suppose the 
> sink operators are chained for each subtask and synchronously executed 
> one after the other in the order I specified them in code (which does 
> correspond to the order in the log message).
>
> A particular subtask does something like this on one thread (just 
> focusing on sinks):
>
> time record     invocations
> 0      record1   sink1.invoke(record1)
> 1                     sink2.invoke(record1)
> 2                     sink3.invoke(record1)
> 3      record2   sink1.invoke(record2)
> 4                     sink2.invoke(record2)
> 5                     sink3.invoke(record2)
> .
> .
> .
>
> Is that correct?
>
> Thanks
>