You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Garry Turkington <g....@improvedigital.com> on 2014/02/10 18:11:37 UTC

Building a producer/consumer supporting exactly-once messaging

Hi,

I've been doing some prototyping on Kafka for a few months now and like what I see. It's a good fit for some of my use cases in the areas of data distribution but also for processing - liking a lot of what I see in Samza. I'm now working through some of the operational issues and have a question to the community.

I have several data sources that I want to push into Kafka but some of the most important are arriving as a stream of files being dropped either into a SFTP location or S3. Conceptually the data is really a stream but its being chunked and made more batch by the deployment model of the operational servers. So pulling the data into Kafka and seeing it more as a stream again is a big plus.

But, I really don't want duplicate messages. I know Kafka provides at least once semantics and that's fine, I'm happy to have the de-dupe logic external to Kafka. And if I look at my producer I can build up a protocol around adding record metadata and using Zookeeper to give me pretty high confidence that my clients will know if they are reading from a file that was fully published into Kafka or not.

I had assumed that this wouldn't be a unique use case but on doing a bunch of searches I really don't find much in terms of either tools that help or even just best practice patterns for handling this type of need to support exactly-once message processing.

So now I'm thinking that either I just need better web search skills or that actually this isn't something many others are doing and if so then there's likely a reason for that.

Any thoughts?

Thanks
Garry


RE: Building a producer/consumer supporting exactly-once messaging

Posted by Garry Turkington <g....@improvedigital.com>.
Thanks Jay for the info, and Neha for adding it to the FAQ!

On the producer side I've been going down Jay's second route, i.e. adding metadata to the messages as they are published. Though in my case I don't just want to avoid duplicates on a per-message basis but be able to quickly identify a partially ingested file so I can quickly drop any related messages.

Since I'll have multiple producers I'm looking to ZooKeeper to help ensure only one reads a given file at a time so I can add to each message the filename and a producer uuid then after a file is fully written either publish completion notices to a different topic or mark the file's ZNode appropriately. A client can then tell if the copy of a given message it is reading comes from the 'committed' ingest of the file (matching producer uuid) or a file that was only partially ingested and should be ignored.

I think this holds together and given my single file reader requirement I'll always need extra machinery outside of Kafka but if things like producer idempotence are possible/truly cheaper server side then that'd be very interesting.

I found Jay's wiki page on the idempotent producer support and that looks really good. Since it looks like in that model the pid is something the client sends with each message then I could change my workflow to be:

1. Producer gains ZK lock on a file ZNode
2. Producer adds the pid as an attribute on the file ZNode if none is already associated with it
3. Producer starts reading/sending messages
4. If a producer fails another can  look for the pid attribute and use it when resending the messages

Very interested in this whole topic.

Garry

-----Original Message-----
From: Jay Kreps [mailto:jay.kreps@gmail.com] 
Sent: 10 February 2014 17:47
To: users@kafka.apache.org
Subject: Re: Building a producer/consumer supporting exactly-once messaging

The out-of-the-box support for this in Kafka isn't great right now.

Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data production.

1. Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded 2. Include a primary key (UUID or something) in the message and deduplicate on the consumer.

If you do one of these things the log that Kafka hosts will be duplicate free. However reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads.
The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:
1. I think producer idempotence is something that could be done automatically and much more cheaply by optionally integrating support for this on the server.
2. The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon.

-Jay







On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington < g.turkington@improvedigital.com> wrote:

> Hi,
>
> I've been doing some prototyping on Kafka for a few months now and 
> like what I see. It's a good fit for some of my use cases in the areas 
> of data distribution but also for processing - liking a lot of what I see in Samza.
> I'm now working through some of the operational issues and have a 
> question to the community.
>
> I have several data sources that I want to push into Kafka but some of 
> the most important are arriving as a stream of files being dropped 
> either into a SFTP location or S3. Conceptually the data is really a 
> stream but its being chunked and made more batch by the deployment 
> model of the operational servers. So pulling the data into Kafka and 
> seeing it more as a stream again is a big plus.
>
> But, I really don't want duplicate messages. I know Kafka provides at 
> least once semantics and that's fine, I'm happy to have the de-dupe 
> logic external to Kafka. And if I look at my producer I can build up a 
> protocol around adding record metadata and using Zookeeper to give me 
> pretty high confidence that my clients will know if they are reading 
> from a file that was fully published into Kafka or not.
>
> I had assumed that this wouldn't be a unique use case but on doing a 
> bunch of searches I really don't find much in terms of either tools 
> that help or even just best practice patterns for handling this type 
> of need to support exactly-once message processing.
>
> So now I'm thinking that either I just need better web search skills 
> or that actually this isn't something many others are doing and if so 
> then there's likely a reason for that.
>
> Any thoughts?
>
> Thanks
> Garry
>
>

-----
No virus found in this message.
Checked by AVG - www.avg.com
Version: 2014.0.4259 / Virus Database: 3697/7080 - Release Date: 02/10/14

Re: Building a producer/consumer supporting exactly-once messaging

Posted by Jay Kreps <ja...@gmail.com>.
Ack, nice, should have thought of doing that...

-Jay


On Mon, Feb 10, 2014 at 10:12 AM, Neha Narkhede <ne...@gmail.com>wrote:

> Added this to our FAQ -
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIgetexactlyonemessagingfromKafka
> ?
>
>
>
> On Mon, Feb 10, 2014 at 9:46 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > The out-of-the-box support for this in Kafka isn't great right now.
> >
> > Exactly once semantics has two parts: avoiding duplication during data
> > production and avoiding duplicates during data consumption.
> >
> > There are two approaches to getting exactly once semantics during data
> > production.
> >
> > 1. Use a single-writer per partition and every time you get a network
> error
> > check the last message in that partition to see if your last write
> > succeeded
> > 2. Include a primary key (UUID or something) in the message and
> deduplicate
> > on the consumer.
> >
> > If you do one of these things the log that Kafka hosts will be duplicate
> > free. However reading without duplicates depends on some co-operation
> from
> > the consumer too. If the consumer is periodically checkpointing its
> > position then if it fails and restarts it will restart from the
> > checkpointed position. Thus if the data output and the checkpoint are not
> > written atomically it will be possible to get duplicates here as well.
> This
> > problem is particular to your storage system. For example if you are
> using
> > a database you could commit these together in a transaction. The HDFS
> > loader Camus that LinkedIn wrote does something like this for Hadoop
> loads.
> > The other alternative that doesn't require a transaction is to store the
> > offset with the data loaded and deduplicate using the
> > topic/partition/offset combination.
> >
> > I think there are two improvements that would make this a lot easier:
> > 1. I think producer idempotence is something that could be done
> > automatically and much more cheaply by optionally integrating support for
> > this on the server.
> > 2. The existing high-level consumer doesn't expose a lot of the more fine
> > grained control of offsets (e.g. to reset your position). We will be
> > working on that soon.
> >
> > -Jay
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington <
> > g.turkington@improvedigital.com> wrote:
> >
> > > Hi,
> > >
> > > I've been doing some prototyping on Kafka for a few months now and like
> > > what I see. It's a good fit for some of my use cases in the areas of
> data
> > > distribution but also for processing - liking a lot of what I see in
> > Samza.
> > > I'm now working through some of the operational issues and have a
> > question
> > > to the community.
> > >
> > > I have several data sources that I want to push into Kafka but some of
> > the
> > > most important are arriving as a stream of files being dropped either
> > into
> > > a SFTP location or S3. Conceptually the data is really a stream but its
> > > being chunked and made more batch by the deployment model of the
> > > operational servers. So pulling the data into Kafka and seeing it more
> > as a
> > > stream again is a big plus.
> > >
> > > But, I really don't want duplicate messages. I know Kafka provides at
> > > least once semantics and that's fine, I'm happy to have the de-dupe
> logic
> > > external to Kafka. And if I look at my producer I can build up a
> protocol
> > > around adding record metadata and using Zookeeper to give me pretty
> high
> > > confidence that my clients will know if they are reading from a file
> that
> > > was fully published into Kafka or not.
> > >
> > > I had assumed that this wouldn't be a unique use case but on doing a
> > bunch
> > > of searches I really don't find much in terms of either tools that help
> > or
> > > even just best practice patterns for handling this type of need to
> > support
> > > exactly-once message processing.
> > >
> > > So now I'm thinking that either I just need better web search skills or
> > > that actually this isn't something many others are doing and if so then
> > > there's likely a reason for that.
> > >
> > > Any thoughts?
> > >
> > > Thanks
> > > Garry
> > >
> > >
> >
>

Re: Building a producer/consumer supporting exactly-once messaging

Posted by Neha Narkhede <ne...@gmail.com>.
Added this to our FAQ -
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIgetexactlyonemessagingfromKafka
?



On Mon, Feb 10, 2014 at 9:46 AM, Jay Kreps <ja...@gmail.com> wrote:

> The out-of-the-box support for this in Kafka isn't great right now.
>
> Exactly once semantics has two parts: avoiding duplication during data
> production and avoiding duplicates during data consumption.
>
> There are two approaches to getting exactly once semantics during data
> production.
>
> 1. Use a single-writer per partition and every time you get a network error
> check the last message in that partition to see if your last write
> succeeded
> 2. Include a primary key (UUID or something) in the message and deduplicate
> on the consumer.
>
> If you do one of these things the log that Kafka hosts will be duplicate
> free. However reading without duplicates depends on some co-operation from
> the consumer too. If the consumer is periodically checkpointing its
> position then if it fails and restarts it will restart from the
> checkpointed position. Thus if the data output and the checkpoint are not
> written atomically it will be possible to get duplicates here as well. This
> problem is particular to your storage system. For example if you are using
> a database you could commit these together in a transaction. The HDFS
> loader Camus that LinkedIn wrote does something like this for Hadoop loads.
> The other alternative that doesn't require a transaction is to store the
> offset with the data loaded and deduplicate using the
> topic/partition/offset combination.
>
> I think there are two improvements that would make this a lot easier:
> 1. I think producer idempotence is something that could be done
> automatically and much more cheaply by optionally integrating support for
> this on the server.
> 2. The existing high-level consumer doesn't expose a lot of the more fine
> grained control of offsets (e.g. to reset your position). We will be
> working on that soon.
>
> -Jay
>
>
>
>
>
>
>
> On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington <
> g.turkington@improvedigital.com> wrote:
>
> > Hi,
> >
> > I've been doing some prototyping on Kafka for a few months now and like
> > what I see. It's a good fit for some of my use cases in the areas of data
> > distribution but also for processing - liking a lot of what I see in
> Samza.
> > I'm now working through some of the operational issues and have a
> question
> > to the community.
> >
> > I have several data sources that I want to push into Kafka but some of
> the
> > most important are arriving as a stream of files being dropped either
> into
> > a SFTP location or S3. Conceptually the data is really a stream but its
> > being chunked and made more batch by the deployment model of the
> > operational servers. So pulling the data into Kafka and seeing it more
> as a
> > stream again is a big plus.
> >
> > But, I really don't want duplicate messages. I know Kafka provides at
> > least once semantics and that's fine, I'm happy to have the de-dupe logic
> > external to Kafka. And if I look at my producer I can build up a protocol
> > around adding record metadata and using Zookeeper to give me pretty high
> > confidence that my clients will know if they are reading from a file that
> > was fully published into Kafka or not.
> >
> > I had assumed that this wouldn't be a unique use case but on doing a
> bunch
> > of searches I really don't find much in terms of either tools that help
> or
> > even just best practice patterns for handling this type of need to
> support
> > exactly-once message processing.
> >
> > So now I'm thinking that either I just need better web search skills or
> > that actually this isn't something many others are doing and if so then
> > there's likely a reason for that.
> >
> > Any thoughts?
> >
> > Thanks
> > Garry
> >
> >
>

Re: Building a producer/consumer supporting exactly-once messaging

Posted by Jay Kreps <ja...@gmail.com>.
The out-of-the-box support for this in Kafka isn't great right now.

Exactly once semantics has two parts: avoiding duplication during data
production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data
production.

1. Use a single-writer per partition and every time you get a network error
check the last message in that partition to see if your last write succeeded
2. Include a primary key (UUID or something) in the message and deduplicate
on the consumer.

If you do one of these things the log that Kafka hosts will be duplicate
free. However reading without duplicates depends on some co-operation from
the consumer too. If the consumer is periodically checkpointing its
position then if it fails and restarts it will restart from the
checkpointed position. Thus if the data output and the checkpoint are not
written atomically it will be possible to get duplicates here as well. This
problem is particular to your storage system. For example if you are using
a database you could commit these together in a transaction. The HDFS
loader Camus that LinkedIn wrote does something like this for Hadoop loads.
The other alternative that doesn't require a transaction is to store the
offset with the data loaded and deduplicate using the
topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:
1. I think producer idempotence is something that could be done
automatically and much more cheaply by optionally integrating support for
this on the server.
2. The existing high-level consumer doesn't expose a lot of the more fine
grained control of offsets (e.g. to reset your position). We will be
working on that soon.

-Jay







On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington <
g.turkington@improvedigital.com> wrote:

> Hi,
>
> I've been doing some prototyping on Kafka for a few months now and like
> what I see. It's a good fit for some of my use cases in the areas of data
> distribution but also for processing - liking a lot of what I see in Samza.
> I'm now working through some of the operational issues and have a question
> to the community.
>
> I have several data sources that I want to push into Kafka but some of the
> most important are arriving as a stream of files being dropped either into
> a SFTP location or S3. Conceptually the data is really a stream but its
> being chunked and made more batch by the deployment model of the
> operational servers. So pulling the data into Kafka and seeing it more as a
> stream again is a big plus.
>
> But, I really don't want duplicate messages. I know Kafka provides at
> least once semantics and that's fine, I'm happy to have the de-dupe logic
> external to Kafka. And if I look at my producer I can build up a protocol
> around adding record metadata and using Zookeeper to give me pretty high
> confidence that my clients will know if they are reading from a file that
> was fully published into Kafka or not.
>
> I had assumed that this wouldn't be a unique use case but on doing a bunch
> of searches I really don't find much in terms of either tools that help or
> even just best practice patterns for handling this type of need to support
> exactly-once message processing.
>
> So now I'm thinking that either I just need better web search skills or
> that actually this isn't something many others are doing and if so then
> there's likely a reason for that.
>
> Any thoughts?
>
> Thanks
> Garry
>
>

Re: Building a producer/consumer supporting exactly-once messaging

Posted by Pradeep Gollakota <pr...@gmail.com>.
Have you read this part of the documentation?
http://kafka.apache.org/documentation.html#semantics

Just wondering if that solves your use case.


On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington <
g.turkington@improvedigital.com> wrote:

> Hi,
>
> I've been doing some prototyping on Kafka for a few months now and like
> what I see. It's a good fit for some of my use cases in the areas of data
> distribution but also for processing - liking a lot of what I see in Samza.
> I'm now working through some of the operational issues and have a question
> to the community.
>
> I have several data sources that I want to push into Kafka but some of the
> most important are arriving as a stream of files being dropped either into
> a SFTP location or S3. Conceptually the data is really a stream but its
> being chunked and made more batch by the deployment model of the
> operational servers. So pulling the data into Kafka and seeing it more as a
> stream again is a big plus.
>
> But, I really don't want duplicate messages. I know Kafka provides at
> least once semantics and that's fine, I'm happy to have the de-dupe logic
> external to Kafka. And if I look at my producer I can build up a protocol
> around adding record metadata and using Zookeeper to give me pretty high
> confidence that my clients will know if they are reading from a file that
> was fully published into Kafka or not.
>
> I had assumed that this wouldn't be a unique use case but on doing a bunch
> of searches I really don't find much in terms of either tools that help or
> even just best practice patterns for handling this type of need to support
> exactly-once message processing.
>
> So now I'm thinking that either I just need better web search skills or
> that actually this isn't something many others are doing and if so then
> there's likely a reason for that.
>
> Any thoughts?
>
> Thanks
> Garry
>
>