You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Andrew Wilcox <an...@gmail.com> on 2018/11/02 00:33:45 UTC
Deduplicating a topic in the face of producer crashes over a time window?
Suppose I have a producer which is ingesting a data stream with unique keys
from an external service and sending it to a Kafka topic. In my producer I
can set enable.idempotence and get exactly-once delivery in the presence of
broker crashes. However my producer might crash after it delivers a batch
of messages to Kafka but before it records that the batch was delivered.
After restarting the crashed producer it would re-deliver the same batch,
resulting in duplicate messages in the topic.
With a streams transformer I can deduplicate the topic by using a state
store to record previously seen keys and then only creating an output
record if the key hasn't been seen before. However without a mechanism to
remove old keys the state store will grow without bound.
Say I only want to deduplicate over a time period such as one day. (I'm
confident that I'll be able to restart a crashed producer sooner). Thus
I'd like keys older than a day to expire out of the state store, so the
store only needs to keep track of keys seen in the last day or so.
Is there a way to do this with Kafka streams? Or is there another
recommended mechanism to keep messages with unique keys unduplicated in the
presence of producer crashes?
Thanks!
Andrew
Re: Deduplicating a topic in the face of producer crashes over a time window?
Posted by Andrew Wilcox <an...@gmail.com>.
Looks great! Thank you Patrik.
On Fri, Nov 2, 2018 at 2:38 AM Patrik Kleindl <pk...@gmail.com> wrote:
> Hi Andrew
>
> Did you take a look at
>
> https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
> ?
> We are using this for a case like you described.
> Growth should be limited with this approach.
>
> Best regards
> Patrik
>
> > Am 02.11.2018 um 01:33 schrieb Andrew Wilcox <an...@gmail.com>:
> >
> > Suppose I have a producer which is ingesting a data stream with unique
> keys
> > from an external service and sending it to a Kafka topic. In my
> producer I
> > can set enable.idempotence and get exactly-once delivery in the presence
> of
> > broker crashes. However my producer might crash after it delivers a
> batch
> > of messages to Kafka but before it records that the batch was delivered.
> > After restarting the crashed producer it would re-deliver the same batch,
> > resulting in duplicate messages in the topic.
> >
> > With a streams transformer I can deduplicate the topic by using a state
> > store to record previously seen keys and then only creating an output
> > record if the key hasn't been seen before. However without a mechanism
> to
> > remove old keys the state store will grow without bound.
> >
> > Say I only want to deduplicate over a time period such as one day. (I'm
> > confident that I'll be able to restart a crashed producer sooner). Thus
> > I'd like keys older than a day to expire out of the state store, so the
> > store only needs to keep track of keys seen in the last day or so.
> >
> > Is there a way to do this with Kafka streams? Or is there another
> > recommended mechanism to keep messages with unique keys unduplicated in
> the
> > presence of producer crashes?
> >
> > Thanks!
> >
> > Andrew
>
Re: Deduplicating a topic in the face of producer crashes over a time window?
Posted by Patrik Kleindl <pk...@gmail.com>.
Hi Andrew
Did you take a look at
https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
?
We are using this for a case like you described.
Growth should be limited with this approach.
Best regards
Patrik
> Am 02.11.2018 um 01:33 schrieb Andrew Wilcox <an...@gmail.com>:
>
> Suppose I have a producer which is ingesting a data stream with unique keys
> from an external service and sending it to a Kafka topic. In my producer I
> can set enable.idempotence and get exactly-once delivery in the presence of
> broker crashes. However my producer might crash after it delivers a batch
> of messages to Kafka but before it records that the batch was delivered.
> After restarting the crashed producer it would re-deliver the same batch,
> resulting in duplicate messages in the topic.
>
> With a streams transformer I can deduplicate the topic by using a state
> store to record previously seen keys and then only creating an output
> record if the key hasn't been seen before. However without a mechanism to
> remove old keys the state store will grow without bound.
>
> Say I only want to deduplicate over a time period such as one day. (I'm
> confident that I'll be able to restart a crashed producer sooner). Thus
> I'd like keys older than a day to expire out of the state store, so the
> store only needs to keep track of keys seen in the last day or so.
>
> Is there a way to do this with Kafka streams? Or is there another
> recommended mechanism to keep messages with unique keys unduplicated in the
> presence of producer crashes?
>
> Thanks!
>
> Andrew