You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Dominik Wosiński <wo...@gmail.com> on 2020/11/23 13:24:21 UTC

Temporal Table Ordering

Hey,
I have a question about the ordering of the messages in the Temporal Table.
I can observe that for one of my jobs the order of input is correct but the
order of the output is not correct.
Say I have two streams that both have *id* field which will be used to join
and also for Kafka partitioning. Let's say the streams are A and B, and the
stream B is used to create a temporal table function.

I have added logging for all elements that are deserialized and serialized
by jobs. So, I can see the following situation:

*Deserializing B with id = 1 and timestamp = 10*
*Deserializing A with id = 1 and timestamp = 20*
*Deserializing A with id = 1 and timestamp = 30*
*Deserializing A with id = 1 and timestamp = 40*
*[Other messages that cause watermarks to be pushed]*

But the logging from serialization schema is:

*Serializing Joined with id = 1 and timestamp = 30*
*Serializing Joined with id = 1 and timestamp = 20*
*Serializing Joined with id = 1 and timestamp = 10*
*Serializing Joined with id = 1 and timestamp = 40*

The input data has proper ordering and is properly partitioned on Kafka. Is
there any known issue that might be causing that? I have virtually run out
of ideas on why I might be observing that.

I will be glad for any help.
Best Regards,
Dom.

Re: Temporal Table Ordering

Posted by Till Rohrmann <tr...@apache.org>.
Hi Dominik,

Are you using Flink's Table API or SQL? If yes, maybe you can share the
program with us to see what exactly your user program is doing. Also it
would help us if you have some example data that would help reproducing the
problem. Moreover, which Flink version are you running?

Cheers,
Till

On Mon, Nov 23, 2020 at 2:24 PM Dominik Wosiński <wo...@gmail.com> wrote:

> Hey,
> I have a question about the ordering of the messages in the Temporal Table.
> I can observe that for one of my jobs the order of input is correct but the
> order of the output is not correct.
> Say I have two streams that both have *id* field which will be used to join
> and also for Kafka partitioning. Let's say the streams are A and B, and the
> stream B is used to create a temporal table function.
>
> I have added logging for all elements that are deserialized and serialized
> by jobs. So, I can see the following situation:
>
> *Deserializing B with id = 1 and timestamp = 10*
> *Deserializing A with id = 1 and timestamp = 20*
> *Deserializing A with id = 1 and timestamp = 30*
> *Deserializing A with id = 1 and timestamp = 40*
> *[Other messages that cause watermarks to be pushed]*
>
> But the logging from serialization schema is:
>
> *Serializing Joined with id = 1 and timestamp = 30*
> *Serializing Joined with id = 1 and timestamp = 20*
> *Serializing Joined with id = 1 and timestamp = 10*
> *Serializing Joined with id = 1 and timestamp = 40*
>
> The input data has proper ordering and is properly partitioned on Kafka. Is
> there any known issue that might be causing that? I have virtually run out
> of ideas on why I might be observing that.
>
> I will be glad for any help.
> Best Regards,
> Dom.
>