You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yun Gao <yu...@aliyun.com> on 2019/11/01 02:32:00 UTC

Re: Preserving (best effort) messages order between operators

     Hi Averell,

           If I understood right, the job graph is A (parallelism = 1) --> B (parallelism > 1), then I think the records sending into the subtask B_i should be the same as the order sending out from A. Therefore, could you also provide more details on the topology ? Is there only the two operators? And could you also provide how the message order is checked in B_i ? 

   Best,
   Yun


------------------------------------------------------------------
From:Averell <lv...@gmail.com>
Send Time:2019 Oct. 31 (Thu.) 12:55
To:user <us...@flink.apache.org>
Subject:Preserving (best effort) messages order between operators

Hi, 

I have a source function with parallelism = 1, sending out records ordered
by event-time. These records are then re-balanced to the next operator which
has parallelism > 1. I observed that within each subtask of the 2nd
operator, the order of the messages is not maintained. Is this behaviour
expected? If it is, is there any way to avoid that? Or at least reduce that?
I have high back-pressure on that 2nd operator as the one after that is
slow. There is also high back-pressure on the 1st operator, which makes my
problem more severe (the mentioned out-of-order is high). If I could
throttle the 1st operator when back-pressure is high, then I could mitigate
the mentioned problem. But I could not find any guide on doing that.

Could you please help?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Preserving (best effort) messages order between operators

Posted by Averell <lv...@gmail.com>.
Hi Yun,

I found the cause of the issue.
That ContinuousFileReaderOperator (my operator B) is using a PriorityQueue
which maintains a buffer sorted by modTime, thus my records were re-ordered.
I don't understand the reason behind using PriorityQueue instead of an
ordinary Queue though.

Thanks.
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Preserving (best effort) messages order between operators

Posted by Huyen Levan <lv...@gmail.com>.
Hi Yun,

My job graph is: (A: 1) -(rebalance)-> (B: 32) -(hash)-> (C: 32). A lists
files, forwards to B as FileInputSlits. B parses those files and shuffles
the data records to C as keyed streams.
C is the slowest in the graph, A is the fastest.

I relied on the slf4j/logback logs to derive that conclusion. There's one
log entry for each context.collect() call of A, and there's one log entry
whenever B open a new FileInputSplits (B is Flink's
ContinuousFileReaderOperator).
My logback configuration is:             <pattern>%d{yyyy-MM-dd
HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} -
%msg%n</pattern>

The logs I got from A showed messages in order (by *dt *in my case).
However, the logs I got from B showed that messages' order was lost (please
refer to the logs below). I suppose that each logback %thread corresponding
exactly one B_i.

Thanks and regards,
Averell

























*2019-10-30 05:30:43.548 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-12/part-00119-2dd7fe37-5e1b-4bc7-8bc4-fc632b419ac02019-10-30
05:30:51.239 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-13/part-00001-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30
05:31:06.537 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-13/part-00083-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30
05:31:13.611 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-13/part-00159-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30
05:31:20.826 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-14/part-00041-c4b2a37e-066d-4adb-b610-a714e7b45b8b2019-10-30
05:31:28.487 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-14/part-00121-c4b2a37e-066d-4adb-b610-a714e7b45b8b2019-10-30
05:31:35.806 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-15/part-00001-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30
05:31:42.739 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-15/part-00081-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30
05:31:49.861 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-01/part-00045-1dc6388b-b72c-4bcd-a337-35c371b583f62019-10-30
05:31:55.834 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-01/part-00130-1dc6388b-b72c-4bcd-a337-35c371b583f62019-10-30
05:32:02.097 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-15/part-00161-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30
05:32:06.452 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-02/part-00000-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30
05:32:11.379 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-02/part-00077-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30
05:32:16.103 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-02/part-00147-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30
05:32:21.025 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-16/part-00039-d12ed910-d58b-46b2-b607-784ebf1266d42019-10-30
05:32:25.758 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-03/part-00043-92a58007-0c35-479b-b9e5-6663fae4e71c2019-10-30
05:32:30.156 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-03/part-00123-92a58007-0c35-479b-b9e5-6663fae4e71c2019-10-30
05:32:34.169 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-16/part-00121-d12ed910-d58b-46b2-b607-784ebf1266d42019-10-30
05:32:39.462 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-04/part-00001-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30
05:32:43.551 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-04/part-00085-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30
05:32:48.100 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-04/part-00166-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30
05:32:52.629 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-17/part-00001-491d8c85-7eb2-48c7-af06-501934f65a832019-10-30
05:32:57.834 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-05/part-00045-19080414-962a-455c-b342-fcf3e36f1cc52019-10-30
05:33:01.943 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-05/part-00113-19080414-962a-455c-b342-fcf3e36f1cc52019-10-30
05:33:06.871 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-17/part-00082-491d8c85-7eb2-48c7-af06-501934f65a83*

On Fri, Nov 1, 2019 at 1:32 PM Yun Gao <yu...@aliyun.com> wrote:

>      Hi Averell,
>
>            If I understood right, the job graph is A (parallelism = 1) -->
> B (parallelism > 1), then I think the records sending into the subtask B_i
> should be the same as the order sending out from A. Therefore, could you
> also provide more details on the topology ? Is there only the two
> operators? And could you also provide how the message order is checked in
> B_i ?
>
>    Best,
>    Yun
>
> ------------------------------------------------------------------
> From:Averell <lv...@gmail.com>
> Send Time:2019 Oct. 31 (Thu.) 12:55
> To:user <us...@flink.apache.org>
> Subject:Preserving (best effort) messages order between operators
>
> Hi,
>
> I have a source function with parallelism = 1, sending out records ordered
>
> by event-time. These records are then re-balanced to the next operator which
> has parallelism > 1. I observed that within each subtask of the 2nd
> operator, the order of the messages is not maintained. Is this behaviour
>
> expected? If it is, is there any way to avoid that? Or at least reduce that?
> I have high back-pressure on that 2nd operator as the one after that is
> slow. There is also high back-pressure on the 1st operator, which makes my
> problem more severe (the mentioned out-of-order is high). If I could
> throttle the 1st operator when back-pressure is high, then I could mitigate
> the mentioned problem. But I could not find any guide on doing that.
>
> Could you please help?
>
> Thanks.
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>