You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Han You <Ha...@akunacapital.com> on 2022/04/22 05:57:02 UTC

Flink batch mode does not sort by event timestamp

I have a custom flink Source, and I have a SerializableTimestampAssigner that assigns event timestamps to records emitted by the source. The source may emit records out of order because of the nature of the underlying data storage, however with BATCH mode, I expect Flink to sort these records by event timestamp before any operator processes them.

Excerpted from Flink document<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/> on execution mode: In BATCH mode, where the input dataset is known in advance, there is no need for such a heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order.

However, this doesn't seem to be the case. If I create a datastream out of the Source (StreamExecutionEnvironment.fromSource) with my timestamp assigner, and then datastream.addSink(x => println(extractTimestamp(x)), the output isn't strictly ascending. Is my understanding of the document wrong? Or does flink expect me (the users) to sort the input dataset themselves?

Thanks in advance for any help!
________________________________
[http://www.akunacapital.com/wp-content/uploads/2018/04/akuna-logo_email-signature_4-3-18.png]
Han You | Junior Developer
Akuna Capital
| www.akunacapital.com <http://www.akunacapital.com>
p: | m: | f: | Han.You@akunacapital.com
Learn More About the New Wave in Tech and Trading<https://www.youtube.com/watch?v=oAS29_yxVcc&feature=youtu.be)>
Visit Us- A 360? Virtual Reality Experience<https://vr.akunacapital.com/registration/>

[http://www.akunacapital.com/images/linkedin.png]<https://www.linkedin.com/company/2322562> [http://www.akunacapital.com/images/facebook.png] <https://www.facebook.com/Akuna-Capital-145809642176373/>  [http://www.akunacapital.com/images/instagram.png] <https://www.instagram.com/akunacapital/>  [http://www.akunacapital.com/images/twitter.png] <https://twitter.com/akunacapital>

Please consider the environment before printing this email.

This electronic message contains information from Akuna Capital LLC that may be confidential, legally privileged or otherwise protected from disclosure. This information is intended for the use of the addressee only and is not offered as investment advice to be relied upon for personal or professional use. Additionally, all electronic messages are recorded and stored in compliance pursuant to applicable SEC rules. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, printing or any other use of, or any action in reliance on, the contents of this electronic message is strictly prohibited. If you have received this communication in error, please notify us by telephone at (312)994- 4640 and destroy the original message.

Re: Flink batch mode does not sort by event timestamp

Posted by David Anderson <da...@apache.org>.
The DataStream API's BATCH execution mode first sorts by key, and within
each key, it sorts by timestamp. By operating this way, it only needs to
keep state for one key at a time, so this keeps the runtime simple and
efficient.

Regards,
David

P.S. I see you also asked this question on stack overflow. Please try to
refrain from asking questions in both forums simultaneously -- this creates
extra work for the community, and we struggle to find the resources to keep
up with all of the questions as it is.



On Fri, Apr 22, 2022 at 7:57 AM Han You <Ha...@akunacapital.com> wrote:

> I have a custom flink Source, and I have a SerializableTimestampAssigner that
> assigns event timestamps to records emitted by the source. The source may
> emit records out of order because of the nature of the underlying data
> storage, however with BATCH mode, I expect Flink to sort these records by
> event timestamp before any operator processes them.
>
> Excerpted from Flink document
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/> on
> execution mode: In BATCH mode, where the input dataset is known in
> advance, there is no need for such a heuristic as, at the very least,
> elements can be sorted by timestamp so that they are processed in temporal
> order.
>
> However, this doesn't seem to be the case. If I create a datastream out of
> the Source (StreamExecutionEnvironment.fromSource) with my timestamp
> assigner, and then datastream.addSink(x => println(extractTimestamp(x)),
> the output isn't strictly ascending. Is my understanding of the document
> wrong? Or does flink expect me (the users) to sort the input dataset
> themselves?
>
>
>
> Thanks in advance for any help!
> ------------------------------
>
> *Han You* | Junior Developer
> *Akuna Capital*
> | www.akunacapital.com
> p: | m: | f: | Han.You@akunacapital.com
> Learn More About the New Wave in Tech and Trading
> <https://www.youtube.com/watch?v=oAS29_yxVcc&feature=youtu.be)>
> Visit Us- A 360° Virtual Reality Experience
> <https://vr.akunacapital.com/registration/>
>
> <https://www.linkedin.com/company/2322562>
> <https://www.facebook.com/Akuna-Capital-145809642176373/>
> <https://www.instagram.com/akunacapital/>
> <https://twitter.com/akunacapital>
>
> Please consider the environment *before* printing this email.
>
> This electronic message contains information from Akuna Capital LLC that
> may be confidential, legally privileged or otherwise protected from
> disclosure. This information is intended for the use of the addressee only
> and is not offered as investment advice to be relied upon for personal or
> professional use. Additionally, all electronic messages are recorded and
> stored in compliance pursuant to applicable SEC rules. If you are not the
> intended recipient, you are hereby notified that any disclosure, copying,
> distribution, printing or any other use of, or any action in reliance on,
> the contents of this electronic message is strictly prohibited. If you have
> received this communication in error, please notify us by telephone at
> (312)994- 4640 and destroy the original message.
>