You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Frank Dekervel <fr...@kapernikov.com> on 2022/02/09 16:03:26 UTC

huge number of duplicate numbers after experiencing a crash loop in deserializer

Hello,

Due to a malformed message in our input queue (kafka), our 
DeserialisationSchema threw an exception, making our flink application 
crash. Since our application was configured to restart, it restarted, 
only to reprocess the same malformed message and crash again.

This happened for a while until we fixed the job and made the 
DeserialisationSchema return null on malformed messages. After that, we 
restarted the job from the last successful savepoint (just before the 
malformed message) and we got a huge number of duplicate messages 
generated from our processors (seems one message for every time flink 
restarted).

In addition to this, one of our map functions had side effects and these 
side effects were also executed a huge number of times. We basically 
have this topology:

kafka source --> stateful process function with event time timers --> 
(slow) map function with side effect (post to external API) --> kafka 
sink (at least once)

We don't use the exactly once sink (instead we use at least once), 
because an occasional duplicate would not harm us and we need low 
latency. However, having massive number of duplicates is a problem.

So i'm trying to understand how checkpoints+savepoints really work and 
in what situation we could end up having a massive amount of duplicates. 
The only way i could think of is the following scenario:

  * the application starts up
  * the stateful process function treats some incoming messages from
    kafka and generates some outgoing messages
  * the slow map function starts processing these messages, and at the
    same time a checkpoint is saved (somehow without new kafka offsets ???)
  * the application crashes on the malformed input

then again:

  * application restarts
  * the stateful process function treats again the same incoming
    messages from kafka, generating exactly the same in flight messages
    again (we use deterministic IDs for these messages and we see the
    same ID being generated over and over).
  * a checkpoint is saved with more in flight messages, the map function
    is slow hence doesn't catch up
  * the application crashes again on the same input.

Are in flight messages stored in a checkpoint somehow ? Is the above 
scenario even possible (reading the design of flink i would think no, 
but then i have no other explanation). We had this once more in the past 
(then it was a crash in another branch of the same dataflow).

Greetings,
Frank




Re: huge number of duplicate numbers after experiencing a crash loop in deserializer

Posted by Frank Dekervel <fr...@kapernikov.com>.
Hello,

Small heads up, we found the cause. It had nothing to do with flink, but 
it was a bug in our own code.
We used CEP to detect when senders would stop send messages (basically 
we used the timeout of a CEP pattern). And when generating these timeout 
messages we made inconsistent use of the different timestamps resulting 
in out of order messages, which were then mistreated in a subsequent 
step ...

Frank




On 09.02.22 17:03, Frank Dekervel wrote:
>
> Hello,
>
> Due to a malformed message in our input queue (kafka), our 
> DeserialisationSchema threw an exception, making our flink application 
> crash. Since our application was configured to restart, it restarted, 
> only to reprocess the same malformed message and crash again.
>
> This happened for a while until we fixed the job and made the 
> DeserialisationSchema return null on malformed messages. After that, 
> we restarted the job from the last successful savepoint (just before 
> the malformed message) and we got a huge number of duplicate messages 
> generated from our processors (seems one message for every time flink 
> restarted).
>
> In addition to this, one of our map functions had side effects and 
> these side effects were also executed a huge number of times. We 
> basically have this topology:
>
> kafka source --> stateful process function with event time timers --> 
> (slow) map function with side effect (post to external API) --> kafka 
> sink (at least once)
>
> We don't use the exactly once sink (instead we use at least once), 
> because an occasional duplicate would not harm us and we need low 
> latency. However, having massive number of duplicates is a problem.
>
> So i'm trying to understand how checkpoints+savepoints really work and 
> in what situation we could end up having a massive amount of 
> duplicates. The only way i could think of is the following scenario:
>
>   * the application starts up
>   * the stateful process function treats some incoming messages from
>     kafka and generates some outgoing messages
>   * the slow map function starts processing these messages, and at the
>     same time a checkpoint is saved (somehow without new kafka offsets
>     ???)
>   * the application crashes on the malformed input
>
> then again:
>
>   * application restarts
>   * the stateful process function treats again the same incoming
>     messages from kafka, generating exactly the same in flight
>     messages again (we use deterministic IDs for these messages and we
>     see the same ID being generated over and over).
>   * a checkpoint is saved with more in flight messages, the map
>     function is slow hence doesn't catch up
>   * the application crashes again on the same input.
>
> Are in flight messages stored in a checkpoint somehow ? Is the above 
> scenario even possible (reading the design of flink i would think no, 
> but then i have no other explanation). We had this once more in the 
> past (then it was a crash in another branch of the same dataflow).
>
> Greetings,
> Frank
>
>
>
>