You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kostas Kloudas (JIRA)" <ji...@apache.org> on 2017/10/20 08:50:00 UTC

[jira] [Commented] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks

    [ https://issues.apache.org/jira/browse/FLINK-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212378#comment-16212378 ] 

Kostas Kloudas commented on FLINK-7666:
---------------------------------------

The problem was that the {{quiesceAndAwaitPending()}} of the {{TimerService}} was called before the {{close()}} of the reader. 

This means that with a periodic watermark emitter and a small file (_e.g._ one split), the timer service would be closed before even starting to read (as soon as the reader receives the first split descriptor to read), and no timers would be able to be registered in order to emit watermarks.

> ContinuousFileReaderOperator swallows chained watermarks
> --------------------------------------------------------
>
>                 Key: FLINK-7666
>                 URL: https://issues.apache.org/jira/browse/FLINK-7666
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>    Affects Versions: 1.3.2
>            Reporter: Ufuk Celebi
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> I use event time and read from a (finite) file. I assign watermarks right after the {{ContinuousFileReaderOperator}} with parallelism 1.
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}
> The watermarks I assign never progress through the pipeline.
> I can work around this by inserting a {{shuffle()}} after the file reader or starting a new chain at the assigner:
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .shuffle() 
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)