You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Frank Dekervel <fr...@kapernikov.com> on 2022/02/09 16:03:26 UTC
huge number of duplicate numbers after experiencing a crash loop in deserializer
Hello,
Due to a malformed message in our input queue (kafka), our
DeserialisationSchema threw an exception, making our flink application
crash. Since our application was configured to restart, it restarted,
only to reprocess the same malformed message and crash again.
This happened for a while until we fixed the job and made the
DeserialisationSchema return null on malformed messages. After that, we
restarted the job from the last successful savepoint (just before the
malformed message) and we got a huge number of duplicate messages
generated from our processors (seems one message for every time flink
restarted).
In addition to this, one of our map functions had side effects and these
side effects were also executed a huge number of times. We basically
have this topology:
kafka source --> stateful process function with event time timers -->
(slow) map function with side effect (post to external API) --> kafka
sink (at least once)
We don't use the exactly once sink (instead we use at least once),
because an occasional duplicate would not harm us and we need low
latency. However, having massive number of duplicates is a problem.
So i'm trying to understand how checkpoints+savepoints really work and
in what situation we could end up having a massive amount of duplicates.
The only way i could think of is the following scenario:
* the application starts up
* the stateful process function treats some incoming messages from
kafka and generates some outgoing messages
* the slow map function starts processing these messages, and at the
same time a checkpoint is saved (somehow without new kafka offsets ???)
* the application crashes on the malformed input
then again:
* application restarts
* the stateful process function treats again the same incoming
messages from kafka, generating exactly the same in flight messages
again (we use deterministic IDs for these messages and we see the
same ID being generated over and over).
* a checkpoint is saved with more in flight messages, the map function
is slow hence doesn't catch up
* the application crashes again on the same input.
Are in flight messages stored in a checkpoint somehow ? Is the above
scenario even possible (reading the design of flink i would think no,
but then i have no other explanation). We had this once more in the past
(then it was a crash in another branch of the same dataflow).
Greetings,
Frank
Re: huge number of duplicate numbers after experiencing a crash loop in deserializer
Posted by Frank Dekervel <fr...@kapernikov.com>.
Hello,
Small heads up, we found the cause. It had nothing to do with flink, but
it was a bug in our own code.
We used CEP to detect when senders would stop send messages (basically
we used the timeout of a CEP pattern). And when generating these timeout
messages we made inconsistent use of the different timestamps resulting
in out of order messages, which were then mistreated in a subsequent
step ...
Frank
On 09.02.22 17:03, Frank Dekervel wrote:
>
> Hello,
>
> Due to a malformed message in our input queue (kafka), our
> DeserialisationSchema threw an exception, making our flink application
> crash. Since our application was configured to restart, it restarted,
> only to reprocess the same malformed message and crash again.
>
> This happened for a while until we fixed the job and made the
> DeserialisationSchema return null on malformed messages. After that,
> we restarted the job from the last successful savepoint (just before
> the malformed message) and we got a huge number of duplicate messages
> generated from our processors (seems one message for every time flink
> restarted).
>
> In addition to this, one of our map functions had side effects and
> these side effects were also executed a huge number of times. We
> basically have this topology:
>
> kafka source --> stateful process function with event time timers -->
> (slow) map function with side effect (post to external API) --> kafka
> sink (at least once)
>
> We don't use the exactly once sink (instead we use at least once),
> because an occasional duplicate would not harm us and we need low
> latency. However, having massive number of duplicates is a problem.
>
> So i'm trying to understand how checkpoints+savepoints really work and
> in what situation we could end up having a massive amount of
> duplicates. The only way i could think of is the following scenario:
>
> * the application starts up
> * the stateful process function treats some incoming messages from
> kafka and generates some outgoing messages
> * the slow map function starts processing these messages, and at the
> same time a checkpoint is saved (somehow without new kafka offsets
> ???)
> * the application crashes on the malformed input
>
> then again:
>
> * application restarts
> * the stateful process function treats again the same incoming
> messages from kafka, generating exactly the same in flight
> messages again (we use deterministic IDs for these messages and we
> see the same ID being generated over and over).
> * a checkpoint is saved with more in flight messages, the map
> function is slow hence doesn't catch up
> * the application crashes again on the same input.
>
> Are in flight messages stored in a checkpoint somehow ? Is the above
> scenario even possible (reading the design of flink i would think no,
> but then i have no other explanation). We had this once more in the
> past (then it was a crash in another branch of the same dataflow).
>
> Greetings,
> Frank
>
>
>
>