You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Theodor Wübker <th...@inside-m2m.de> on 2023/02/12 08:25:45 UTC

Non-Determinism in Table-API with Kafka and Event Time

Hey everyone,

I experience non-determinism in my Table API Program at the moment and (as a relatively unexperienced Flink and Kafka user) I can’t really explain to myself why it happens. So, I have a topic with 10 Partitions and a bit of Data on it. Now I run a simple SELECT * query on this, that moves some attributes around and writes everything on another topic with 10 partitions. Then, on this topic I run a Event-Time-Windowed Query. Now I experience Non-Determinism: The results of the windowed query differ with every execution. 
I thought this might be, because the SELECT query wrote the data to the partitioned topic without keys. So I tried it again with the same key I used for the original topic. It resulted in the exact same topic structure. Now when I run the Event-Time-Windowed query, I get incorrect results (too few result-entries). 

I have already read a lot of the Docs on this and can’t seem to figure it out. I would much appreciate, if someone could shed a bit of light on this. Is there anything in particular I should be aware of, when reading partitioned topics and running an event time query on that? Thanks :)


Best,
Theo

Re: Non-Determinism in Table-API with Kafka and Event Time

Posted by Theodor Wübker <th...@inside-m2m.de>.
Hey Hector,

thanks for your reply. Your assumption is entirely correct, I have a few Million datasets on the topic already to test a streaming use case. I am planning on testing it with a variety of settings, but the problems occur with any cluster-configuration. For example Parallelism 1 with 1 Taskmanager and 1 slot. I plan to scale it up to 10 slots and 10 parallelism for testing purposes. 

I do not think that any events are kept on hold, since the output always contains windows with the latest timestamp (but not enough of them, it should be much more). Nevertheless I will try your suggestion.

Maybe my configuration is wrong? The only “out-of-orderness”-related thing I have configured is Watermarks, in the way I sent previously. The docs [1] mention per-kafka-partition watermarks, perhaps this would help me? Sadly, it does not say there, how to activate it.

Best,
Theo

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector


> On 13. Feb 2023, at 10:42, Hector Rios <oj...@gmail.com> wrote:
> 
> Hi Theo
> 
> In your initial email, you mentioned that you have "a bit of Data on it" when referring to your topic with ten partitions. Correct me if I'm wrong, but that sounds like the data in your topic is bounded and trying to test a streaming use-case. What kind of parallelism do you have configured for this job? Is there a configuration to set the number of slots per task manager?
> 
> I've seen varying results based on the amount of parallelism configured on a job. In the end, it usually boils down to the fact that events might be ingested into Flink out of order. If the event time on an event is earlier than the current watermark, then the event might be discarded unless you've configured some level of out-of-orderedness. Even with out-of-orderedness configured, if your data is bounded, you might have events with later event times arriving earlier, which will remain in the state waiting for the watermark to progress. As you can imagine, if there are no more events, then your records are on hold. 
> 
> As a test, after all, your events have been ingested from the topic, try producing one last event with an event time one or 2 hours later than your latest event and see if they show up.
> 
> Hope it helps
> -Hector
> 
> On Mon, Feb 13, 2023 at 8:47 AM Theodor Wübker <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>> wrote:
> Hey,
> 
> so one more thing, the query looks like this:
> 
> SELECT window_start, window_end, a, b, c, count(*) as x FROM TABLE(TUMBLE(TABLE data.v1, DESCRIPTOR(timeStampData), INTERVAL '1' HOUR)) GROUP BY window_start, window_end, a, b, c
> 
> When the non-determinism occurs, the topic is not keyed at all. When I key it by the attribute “a”, I get the incorrect, but deterministic results. Maybe in the second case, only 1 partition out of the 10 is consumed at once?
> 
> Best,
> Theo
> 
>> On 13. Feb 2023, at 08:15, Theodor Wübker <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>> wrote
>> 
>> Hey Yuxia,
>> 
>> thanks for your response. I figured too, that the events arrive in a (somewhat) random order and thus cause non-determinism. I used a Watermark like this:"timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark Interval does not solve the problem though, the results are still not deterministic. Instead I keyed the 10 partition topic. Now results are deterministic, but they are incorrect (way too few). Am I doing something fundamentally wrong? I just need the messages to be in somewhat in order (just so they don’t violate the watermark). 
>> 
>> Best,
>> Theo
>> 
>> (sent again, sorry, I previously only responded to you, not the Mailing list by accident)
>> 
>>> On 13. Feb 2023, at 08:14, Theodor Wübker <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>> wrote:
>>> 
>>> Hey Yuxia,
>>> 
>>> thanks for your response. I figured too, that the events arrive in a (somewhat) random order and thus cause non-determinism. I used a Watermark like this: "timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark Interval does not solve the problem though, the results are still not deterministic. Instead I keyed the 10 partition topic. Now results are deterministic, but they are incorrect (way too few). Am I doing something fundamentally wrong? I just need the messages to be in somewhat in order (just so they don’t violate the watermark). 
>>> 
>>> Best,
>>> Theo
>>> 
>>>> On 13. Feb 2023, at 04:23, yuxia <luoyuxia@alumni.sjtu.edu.cn <ma...@alumni.sjtu.edu.cn>> wrote:
>>>> 
>>>> HI, Theo.
>>>> I'm wondering what the Event-Time-Windowed Query you are using looks like.
>>>> For example, how do you define the watermark?
>>>> Considering you read records from the 10 partitions, and it may well that the records will arrive the window process operator out of order. 
>>>> Is it possible that the records exceed the watermark, but there're still some records will arrive?
>>>> 
>>>> If that's the case, every time, the records used to calculate result may well different and then result in non-determinism result.
>>>> 
>>>> Best regards,
>>>> Yuxia
>>>> 
>>>> ----- 原始邮件 -----
>>>> 发件人: "Theodor Wübker" <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>>
>>>> 收件人: "User" <user@flink.apache.org <ma...@flink.apache.org>>
>>>> 发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45
>>>> 主题: Non-Determinism in Table-API with Kafka and Event Time
>>>> 
>>>> Hey everyone,
>>>> 
>>>> I experience non-determinism in my Table API Program at the moment and (as a relatively unexperienced Flink and Kafka user) I can’t really explain to myself why it happens. So, I have a topic with 10 Partitions and a bit of Data on it. Now I run a simple SELECT * query on this, that moves some attributes around and writes everything on another topic with 10 partitions. Then, on this topic I run a Event-Time-Windowed Query. Now I experience Non-Determinism: The results of the windowed query differ with every execution. 
>>>> I thought this might be, because the SELECT query wrote the data to the partitioned topic without keys. So I tried it again with the same key I used for the original topic. It resulted in the exact same topic structure. Now when I run the Event-Time-Windowed query, I get incorrect results (too few result-entries). 
>>>> 
>>>> I have already read a lot of the Docs on this and can’t seem to figure it out. I would much appreciate, if someone could shed a bit of light on this. Is there anything in particular I should be aware of, when reading partitioned topics and running an event time query on that? Thanks :)
>>>> 
>>>> 
>>>> Best,
>>>> Theo
>>> 
>> 
> 


Re: Non-Determinism in Table-API with Kafka and Event Time

Posted by Hector Rios <oj...@gmail.com>.
Hi Theo

In your initial email, you mentioned that you have "a bit of Data on it"
when referring to your topic with ten partitions. Correct me if I'm wrong,
but that sounds like the data in your topic is bounded and trying to test a
streaming use-case. What kind of parallelism do you have configured for
this job? Is there a configuration to set the number of slots per task
manager?

I've seen varying results based on the amount of parallelism configured on
a job. In the end, it usually boils down to the fact that events might be
ingested into Flink out of order. If the event time on an event is earlier
than the current watermark, then the event might be discarded unless you've
configured some level of out-of-orderedness. Even with out-of-orderedness
configured, if your data is bounded, you might have events with later event
times arriving earlier, which will remain in the state waiting for the
watermark to progress. As you can imagine, if there are no more events,
then your records are on hold.

As a test, after all, your events have been ingested from the topic, try
producing one last event with an event time one or 2 hours later than your
latest event and see if they show up.

Hope it helps
-Hector

On Mon, Feb 13, 2023 at 8:47 AM Theodor Wübker <th...@inside-m2m.de>
wrote:

> Hey,
>
> so one more thing, the query looks like this:
>
> SELECT window_start, window_end, a, b, c, count(*) as x FROM TABLE(TUMBLE(TABLE
> data.v1, DESCRIPTOR(timeStampData), INTERVAL '1' HOUR)) GROUP BY
> window_start, window_end, a, b, c
>
> When the non-determinism occurs, the topic is not keyed at all. When I key
> it by the attribute “a”, I get the incorrect, but deterministic results.
> Maybe in the second case, only 1 partition out of the 10 is consumed at
> once?
>
> Best,
> Theo
>
> On 13. Feb 2023, at 08:15, Theodor Wübker <th...@inside-m2m.de>
> wrote
>
> Hey Yuxia,
>
> thanks for your response. I figured too, that the events arrive in a
> (somewhat) random order and thus cause non-determinism. I used a
> Watermark like this:"timeStampData - INTERVAL '10' SECOND*”* . Increasing
> the Watermark Interval does not solve the problem though, the results are
> still not deterministic. Instead I keyed the 10 partition topic. Now
> results are deterministic, but they are incorrect (way too few). Am I doing
> something fundamentally wrong? I just need the messages to be in somewhat
> in order (just so they don’t violate the watermark).
>
> Best,
> Theo
>
> (sent again, sorry, I previously only responded to you, not the Mailing
> list by accident)
>
> On 13. Feb 2023, at 08:14, Theodor Wübker <th...@inside-m2m.de>
> wrote:
>
> Hey Yuxia,
>
> thanks for your response. I figured too, that the events arrive in a
> (somewhat) random order and thus cause non-determinism. I used a
> Watermark like this: "timeStampData - INTERVAL '10' SECOND*”* .
> Increasing the Watermark Interval does not solve the problem though, the
> results are still not deterministic. Instead I keyed the 10 partition
> topic. Now results are deterministic, but they are incorrect (way too few).
> Am I doing something fundamentally wrong? I just need the messages to be in
> somewhat in order (just so they don’t violate the watermark).
>
> Best,
> Theo
>
> On 13. Feb 2023, at 04:23, yuxia <lu...@alumni.sjtu.edu.cn> wrote:
>
> HI, Theo.
> I'm wondering what the Event-Time-Windowed Query you are using looks like.
> For example, how do you define the watermark?
> Considering you read records from the 10 partitions, and it may well that
> the records will arrive the window process operator out of order.
> Is it possible that the records exceed the watermark, but there're still
> some records will arrive?
>
> If that's the case, every time, the records used to calculate result may
> well different and then result in non-determinism result.
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "Theodor Wübker" <th...@inside-m2m.de>
> 收件人: "User" <us...@flink.apache.org>
> 发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45
> 主题: Non-Determinism in Table-API with Kafka and Event Time
>
> Hey everyone,
>
> I experience non-determinism in my Table API Program at the moment and (as
> a relatively unexperienced Flink and Kafka user) I can’t really explain to
> myself why it happens. So, I have a topic with 10 Partitions and a bit of
> Data on it. Now I run a simple SELECT * query on this, that moves some
> attributes around and writes everything on another topic with 10
> partitions. Then, on this topic I run a Event-Time-Windowed Query. Now I
> experience Non-Determinism: The results of the windowed query differ with
> every execution.
> I thought this might be, because the SELECT query wrote the data to the
> partitioned topic without keys. So I tried it again with the same key I
> used for the original topic. It resulted in the exact same topic structure.
> Now when I run the Event-Time-Windowed query, I get incorrect results (too
> few result-entries).
>
> I have already read a lot of the Docs on this and can’t seem to figure it
> out. I would much appreciate, if someone could shed a bit of light on this.
> Is there anything in particular I should be aware of, when reading
> partitioned topics and running an event time query on that? Thanks :)
>
>
> Best,
> Theo
>
>
>
>
>

Re: Non-Determinism in Table-API with Kafka and Event Time

Posted by Theodor Wübker <th...@inside-m2m.de>.
Hey,

so one more thing, the query looks like this:

SELECT window_start, window_end, a, b, c, count(*) as x FROM TABLE(TUMBLE(TABLE data.v1, DESCRIPTOR(timeStampData), INTERVAL '1' HOUR)) GROUP BY window_start, window_end, a, b, c

When the non-determinism occurs, the topic is not keyed at all. When I key it by the attribute “a”, I get the incorrect, but deterministic results. Maybe in the second case, only 1 partition out of the 10 is consumed at once?

Best,
Theo

> On 13. Feb 2023, at 08:15, Theodor Wübker <th...@inside-m2m.de> wrote
> 
> Hey Yuxia,
> 
> thanks for your response. I figured too, that the events arrive in a (somewhat) random order and thus cause non-determinism. I used a Watermark like this:"timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark Interval does not solve the problem though, the results are still not deterministic. Instead I keyed the 10 partition topic. Now results are deterministic, but they are incorrect (way too few). Am I doing something fundamentally wrong? I just need the messages to be in somewhat in order (just so they don’t violate the watermark). 
> 
> Best,
> Theo
> 
> (sent again, sorry, I previously only responded to you, not the Mailing list by accident)
> 
>> On 13. Feb 2023, at 08:14, Theodor Wübker <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>> wrote:
>> 
>> Hey Yuxia,
>> 
>> thanks for your response. I figured too, that the events arrive in a (somewhat) random order and thus cause non-determinism. I used a Watermark like this: "timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark Interval does not solve the problem though, the results are still not deterministic. Instead I keyed the 10 partition topic. Now results are deterministic, but they are incorrect (way too few). Am I doing something fundamentally wrong? I just need the messages to be in somewhat in order (just so they don’t violate the watermark). 
>> 
>> Best,
>> Theo
>> 
>>> On 13. Feb 2023, at 04:23, yuxia <luoyuxia@alumni.sjtu.edu.cn <ma...@alumni.sjtu.edu.cn>> wrote:
>>> 
>>> HI, Theo.
>>> I'm wondering what the Event-Time-Windowed Query you are using looks like.
>>> For example, how do you define the watermark?
>>> Considering you read records from the 10 partitions, and it may well that the records will arrive the window process operator out of order. 
>>> Is it possible that the records exceed the watermark, but there're still some records will arrive?
>>> 
>>> If that's the case, every time, the records used to calculate result may well different and then result in non-determinism result.
>>> 
>>> Best regards,
>>> Yuxia
>>> 
>>> ----- 原始邮件 -----
>>> 发件人: "Theodor Wübker" <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>>
>>> 收件人: "User" <user@flink.apache.org <ma...@flink.apache.org>>
>>> 发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45
>>> 主题: Non-Determinism in Table-API with Kafka and Event Time
>>> 
>>> Hey everyone,
>>> 
>>> I experience non-determinism in my Table API Program at the moment and (as a relatively unexperienced Flink and Kafka user) I can’t really explain to myself why it happens. So, I have a topic with 10 Partitions and a bit of Data on it. Now I run a simple SELECT * query on this, that moves some attributes around and writes everything on another topic with 10 partitions. Then, on this topic I run a Event-Time-Windowed Query. Now I experience Non-Determinism: The results of the windowed query differ with every execution. 
>>> I thought this might be, because the SELECT query wrote the data to the partitioned topic without keys. So I tried it again with the same key I used for the original topic. It resulted in the exact same topic structure. Now when I run the Event-Time-Windowed query, I get incorrect results (too few result-entries). 
>>> 
>>> I have already read a lot of the Docs on this and can’t seem to figure it out. I would much appreciate, if someone could shed a bit of light on this. Is there anything in particular I should be aware of, when reading partitioned topics and running an event time query on that? Thanks :)
>>> 
>>> 
>>> Best,
>>> Theo
>> 
> 


Re: Non-Determinism in Table-API with Kafka and Event Time

Posted by Theodor Wübker <th...@inside-m2m.de>.
Hey Yuxia,

thanks for your response. I figured too, that the events arrive in a (somewhat) random order and thus cause non-determinism. I used a Watermark like this:"timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark Interval does not solve the problem though, the results are still not deterministic. Instead I keyed the 10 partition topic. Now results are deterministic, but they are incorrect (way too few). Am I doing something fundamentally wrong? I just need the messages to be in somewhat in order (just so they don’t violate the watermark). 

Best,
Theo

(sent again, sorry, I previously only responded to you, not the Mailing list by accident)

> On 13. Feb 2023, at 08:14, Theodor Wübker <th...@inside-m2m.de> wrote:
> 
> Hey Yuxia,
> 
> thanks for your response. I figured too, that the events arrive in a (somewhat) random order and thus cause non-determinism. I used a Watermark like this: "timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark Interval does not solve the problem though, the results are still not deterministic. Instead I keyed the 10 partition topic. Now results are deterministic, but they are incorrect (way too few). Am I doing something fundamentally wrong? I just need the messages to be in somewhat in order (just so they don’t violate the watermark). 
> 
> Best,
> Theo
> 
>> On 13. Feb 2023, at 04:23, yuxia <luoyuxia@alumni.sjtu.edu.cn <ma...@alumni.sjtu.edu.cn>> wrote:
>> 
>> HI, Theo.
>> I'm wondering what the Event-Time-Windowed Query you are using looks like.
>> For example, how do you define the watermark?
>> Considering you read records from the 10 partitions, and it may well that the records will arrive the window process operator out of order. 
>> Is it possible that the records exceed the watermark, but there're still some records will arrive?
>> 
>> If that's the case, every time, the records used to calculate result may well different and then result in non-determinism result.
>> 
>> Best regards,
>> Yuxia
>> 
>> ----- 原始邮件 -----
>> 发件人: "Theodor Wübker" <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>>
>> 收件人: "User" <user@flink.apache.org <ma...@flink.apache.org>>
>> 发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45
>> 主题: Non-Determinism in Table-API with Kafka and Event Time
>> 
>> Hey everyone,
>> 
>> I experience non-determinism in my Table API Program at the moment and (as a relatively unexperienced Flink and Kafka user) I can’t really explain to myself why it happens. So, I have a topic with 10 Partitions and a bit of Data on it. Now I run a simple SELECT * query on this, that moves some attributes around and writes everything on another topic with 10 partitions. Then, on this topic I run a Event-Time-Windowed Query. Now I experience Non-Determinism: The results of the windowed query differ with every execution. 
>> I thought this might be, because the SELECT query wrote the data to the partitioned topic without keys. So I tried it again with the same key I used for the original topic. It resulted in the exact same topic structure. Now when I run the Event-Time-Windowed query, I get incorrect results (too few result-entries). 
>> 
>> I have already read a lot of the Docs on this and can’t seem to figure it out. I would much appreciate, if someone could shed a bit of light on this. Is there anything in particular I should be aware of, when reading partitioned topics and running an event time query on that? Thanks :)
>> 
>> 
>> Best,
>> Theo
> 


Re: Non-Determinism in Table-API with Kafka and Event Time

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
HI, Theo.
I'm wondering what the Event-Time-Windowed Query you are using looks like.
For example, how do you define the watermark?
Considering you read records from the 10 partitions, and it may well that the records will arrive the window process operator out of order. 
Is it possible that the records exceed the watermark, but there're still some records will arrive?

If that's the case, every time, the records used to calculate result may well different and then result in non-determinism result.

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Theodor Wübker" <th...@inside-m2m.de>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45
主题: Non-Determinism in Table-API with Kafka and Event Time

Hey everyone,

I experience non-determinism in my Table API Program at the moment and (as a relatively unexperienced Flink and Kafka user) I can’t really explain to myself why it happens. So, I have a topic with 10 Partitions and a bit of Data on it. Now I run a simple SELECT * query on this, that moves some attributes around and writes everything on another topic with 10 partitions. Then, on this topic I run a Event-Time-Windowed Query. Now I experience Non-Determinism: The results of the windowed query differ with every execution. 
I thought this might be, because the SELECT query wrote the data to the partitioned topic without keys. So I tried it again with the same key I used for the original topic. It resulted in the exact same topic structure. Now when I run the Event-Time-Windowed query, I get incorrect results (too few result-entries). 

I have already read a lot of the Docs on this and can’t seem to figure it out. I would much appreciate, if someone could shed a bit of light on this. Is there anything in particular I should be aware of, when reading partitioned topics and running an event time query on that? Thanks :)


Best,
Theo