You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Andrew Wilcox <an...@gmail.com> on 2018/11/02 00:33:45 UTC

Deduplicating a topic in the face of producer crashes over a time window?

Suppose I have a producer which is ingesting a data stream with unique keys
from an external service and sending it to a Kafka topic.  In my producer I
can set enable.idempotence and get exactly-once delivery in the presence of
broker crashes.  However my producer might crash after it delivers a batch
of messages to Kafka but before it records that the batch was delivered.
After restarting the crashed producer it would re-deliver the same batch,
resulting in duplicate messages in the topic.

With a streams transformer I can deduplicate the topic by using a state
store to record previously seen keys and then only creating an output
record if the key hasn't been seen before.  However without a mechanism to
remove old keys the state store will grow without bound.

Say I only want to deduplicate over a time period such as one day.  (I'm
confident that I'll be able to restart a crashed producer sooner).  Thus
I'd like keys older than a day to expire out of the state store, so the
store only needs to keep track of keys seen in the last day or so.

Is there a way to do this with Kafka streams?  Or is there another
recommended mechanism to keep messages with unique keys unduplicated in the
presence of producer crashes?

Thanks!

Andrew

Re: Deduplicating a topic in the face of producer crashes over a time window?

Posted by Andrew Wilcox <an...@gmail.com>.
Looks great!  Thank you Patrik.

On Fri, Nov 2, 2018 at 2:38 AM Patrik Kleindl <pk...@gmail.com> wrote:

> Hi Andrew
>
> Did you take a look at
>
> https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
> ?
> We are using this for a case like you described.
> Growth should be limited with this approach.
>
> Best regards
> Patrik
>
> > Am 02.11.2018 um 01:33 schrieb Andrew Wilcox <an...@gmail.com>:
> >
> > Suppose I have a producer which is ingesting a data stream with unique
> keys
> > from an external service and sending it to a Kafka topic.  In my
> producer I
> > can set enable.idempotence and get exactly-once delivery in the presence
> of
> > broker crashes.  However my producer might crash after it delivers a
> batch
> > of messages to Kafka but before it records that the batch was delivered.
> > After restarting the crashed producer it would re-deliver the same batch,
> > resulting in duplicate messages in the topic.
> >
> > With a streams transformer I can deduplicate the topic by using a state
> > store to record previously seen keys and then only creating an output
> > record if the key hasn't been seen before.  However without a mechanism
> to
> > remove old keys the state store will grow without bound.
> >
> > Say I only want to deduplicate over a time period such as one day.  (I'm
> > confident that I'll be able to restart a crashed producer sooner).  Thus
> > I'd like keys older than a day to expire out of the state store, so the
> > store only needs to keep track of keys seen in the last day or so.
> >
> > Is there a way to do this with Kafka streams?  Or is there another
> > recommended mechanism to keep messages with unique keys unduplicated in
> the
> > presence of producer crashes?
> >
> > Thanks!
> >
> > Andrew
>

Re: Deduplicating a topic in the face of producer crashes over a time window?

Posted by Patrik Kleindl <pk...@gmail.com>.
Hi Andrew

Did you take a look at
https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
?
We are using this for a case like you described.
Growth should be limited with this approach.

Best regards 
Patrik

> Am 02.11.2018 um 01:33 schrieb Andrew Wilcox <an...@gmail.com>:
> 
> Suppose I have a producer which is ingesting a data stream with unique keys
> from an external service and sending it to a Kafka topic.  In my producer I
> can set enable.idempotence and get exactly-once delivery in the presence of
> broker crashes.  However my producer might crash after it delivers a batch
> of messages to Kafka but before it records that the batch was delivered.
> After restarting the crashed producer it would re-deliver the same batch,
> resulting in duplicate messages in the topic.
> 
> With a streams transformer I can deduplicate the topic by using a state
> store to record previously seen keys and then only creating an output
> record if the key hasn't been seen before.  However without a mechanism to
> remove old keys the state store will grow without bound.
> 
> Say I only want to deduplicate over a time period such as one day.  (I'm
> confident that I'll be able to restart a crashed producer sooner).  Thus
> I'd like keys older than a day to expire out of the state store, so the
> store only needs to keep track of keys seen in the last day or so.
> 
> Is there a way to do this with Kafka streams?  Or is there another
> recommended mechanism to keep messages with unique keys unduplicated in the
> presence of producer crashes?
> 
> Thanks!
> 
> Andrew