You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tom Brown <to...@gmail.com> on 2012/10/25 23:44:18 UTC

Transactional writing

Is there an accepted, or recommended way to make writes to a Kafka
queue idempotent, or within a transaction?

I can configure my system such that each queue has exactly one producer.

(If there are no accepted/recommended ways, I have a few ideas I would
like to propose. I would also be willing to implement them if needed)

Thanks in advance!

--Tom

Re: Transactional writing

Posted by Tom Brown <to...@gmail.com>.
That's was exactly the conclusion I arrived at, which is why I don't
want to worry too much about that case.

Back to the original two proposals-- If I'm going to attempt to
implement one or the other, do any of you who are familiar with the
code want to offer advice as to which is preferred (or if no
preference, which would be easier to implement)?

--Tom

On Fri, Oct 26, 2012 at 10:24 AM, Jun Rao <ju...@gmail.com> wrote:
> In a database, if you know the key of the last record that you are writing,
> you can do a read to see if your last change is actually made. However, if
> you use auto-generated keys by the server and are doing an insert, you
> don't even know the key on failure. So, in general, this is an unsolved
> problem and needs to be handled at the application level. Most applications
> will just do resend and deal with the consequence of potential duplicates.
>
> Thanks,
>
> Jun
>
> On Thu, Oct 25, 2012 at 10:00 PM, Tom Brown <to...@gmail.com> wrote:
>
>> How do other systems deal with that? If I send "commit" to Oracle, but
>> my connection dies before I get the ack, is the data committed or not?
>>
>> What about the other case? If I send "commit" to Oracle, but the
>> server dies before I get the ack, is the data committed or not?
>>
>> In either case, how can I tell?
>>
>> --Tom
>>
>> On Fri, Oct 26, 2012 at 12:15 AM, Jun Rao <ju...@gmail.com> wrote:
>> > Even if you have transaction support, the same problem exists. If the
>> > client died before receiving the ack, it's not sure whether the broker
>> > really committed the data or not.
>> >
>> > To address this issue, the client can save the offset of committed
>> messages
>> > periodically. On restart from a crash, it first reads all messages after
>> > the last saved offset. It then knows whether the last message is
>> committed
>> > or not and can decide whether the message should be resent or not. This
>> > probably only works for a single producer.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <ph...@loggly.com>
>> wrote:
>> >
>> >> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
>> >> > The closest concept of transaction on the publisher side, that I can
>> >> > think of, is using batch of messages in a single call to the
>> >> > synchronous producer.
>> >> >
>> >> > Precisely, you can configure a Kafka producer to use the "sync" mode
>> >> > and batch messages that require transactional guarantees in a
>> >> > single send() call. That will ensure that either all the messages in
>> >> > the batch are sent or none.
>> >>
>> >> This is an interesting feature -- something I wasn't aware of. Still it
>> >> doesn't solve the problem *completely*. As many people realise, it's
>> still
>> >> possible for the batch of messages to get into Kafka fine, but the ack
>> from
>> >> Kafka to be lost on its way back to the Producer. In that case the
>> Producer
>> >> erroneously believes the messages didn't get in, and might re-send them.
>> >>
>> >> You guys *haven't* solved that issue, right? I believe you write about
>> it
>> >> on
>> >> the Kafka site.
>> >>
>> >> >
>> >> > Thanks,
>> >> > Neha
>> >> >
>> >> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
>> wrote:
>> >> > > Is there an accepted, or recommended way to make writes to a Kafka
>> >> > > queue idempotent, or within a transaction?
>> >> > >
>> >> > > I can configure my system such that each queue has exactly one
>> >> producer.
>> >> > >
>> >> > > (If there are no accepted/recommended ways, I have a few ideas I
>> would
>> >> > > like to propose. I would also be willing to implement them if
>> needed)
>> >> > >
>> >> > > Thanks in advance!
>> >> > >
>> >> > > --Tom
>> >>
>> >> --
>> >> Philip O'Toole
>> >>
>> >> Senior Developer
>> >> Loggly, Inc.
>> >> San Francisco, Calif.
>> >> www.loggly.com
>> >>
>> >> Come join us!
>> >> http://loggly.com/company/careers/
>> >>
>>

Re: Transactional writing

Posted by Jay Kreps <ja...@gmail.com>.
Yes, I don't think any single-hop request/response protocol can avoid this
problem when performing general mutations on a remote system. This includes
essentially all common usage of web services, databases, messaging systems,
etc. That is to say, if the client gets a socket related error when
performing a remote mutation operation the client doesn't know if the
mutation occurred or not. There are consensus-type protocols that resolve
this kind of problem in the general case by allowing many participants to
come into agreement on whether or not the mutation took place, but these
have a number of drawbacks as a client-facing protocol and mostly aren't
used for this purpose. I think in practice most people rely on application
logic (making updates idempotent, deduplicating on the client, etc).

-Jay

On Fri, Oct 26, 2012 at 7:24 AM, Jun Rao <ju...@gmail.com> wrote:

> In a database, if you know the key of the last record that you are writing,
> you can do a read to see if your last change is actually made. However, if
> you use auto-generated keys by the server and are doing an insert, you
> don't even know the key on failure. So, in general, this is an unsolved
> problem and needs to be handled at the application level. Most applications
> will just do resend and deal with the consequence of potential duplicates.
>
> Thanks,
>
> Jun
>
> On Thu, Oct 25, 2012 at 10:00 PM, Tom Brown <to...@gmail.com> wrote:
>
> > How do other systems deal with that? If I send "commit" to Oracle, but
> > my connection dies before I get the ack, is the data committed or not?
> >
> > What about the other case? If I send "commit" to Oracle, but the
> > server dies before I get the ack, is the data committed or not?
> >
> > In either case, how can I tell?
> >
> > --Tom
> >
> > On Fri, Oct 26, 2012 at 12:15 AM, Jun Rao <ju...@gmail.com> wrote:
> > > Even if you have transaction support, the same problem exists. If the
> > > client died before receiving the ack, it's not sure whether the broker
> > > really committed the data or not.
> > >
> > > To address this issue, the client can save the offset of committed
> > messages
> > > periodically. On restart from a crash, it first reads all messages
> after
> > > the last saved offset. It then knows whether the last message is
> > committed
> > > or not and can decide whether the message should be resent or not. This
> > > probably only works for a single producer.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <ph...@loggly.com>
> > wrote:
> > >
> > >> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> > >> > The closest concept of transaction on the publisher side, that I can
> > >> > think of, is using batch of messages in a single call to the
> > >> > synchronous producer.
> > >> >
> > >> > Precisely, you can configure a Kafka producer to use the "sync" mode
> > >> > and batch messages that require transactional guarantees in a
> > >> > single send() call. That will ensure that either all the messages in
> > >> > the batch are sent or none.
> > >>
> > >> This is an interesting feature -- something I wasn't aware of. Still
> it
> > >> doesn't solve the problem *completely*. As many people realise, it's
> > still
> > >> possible for the batch of messages to get into Kafka fine, but the ack
> > from
> > >> Kafka to be lost on its way back to the Producer. In that case the
> > Producer
> > >> erroneously believes the messages didn't get in, and might re-send
> them.
> > >>
> > >> You guys *haven't* solved that issue, right? I believe you write about
> > it
> > >> on
> > >> the Kafka site.
> > >>
> > >> >
> > >> > Thanks,
> > >> > Neha
> > >> >
> > >> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
> > wrote:
> > >> > > Is there an accepted, or recommended way to make writes to a Kafka
> > >> > > queue idempotent, or within a transaction?
> > >> > >
> > >> > > I can configure my system such that each queue has exactly one
> > >> producer.
> > >> > >
> > >> > > (If there are no accepted/recommended ways, I have a few ideas I
> > would
> > >> > > like to propose. I would also be willing to implement them if
> > needed)
> > >> > >
> > >> > > Thanks in advance!
> > >> > >
> > >> > > --Tom
> > >>
> > >> --
> > >> Philip O'Toole
> > >>
> > >> Senior Developer
> > >> Loggly, Inc.
> > >> San Francisco, Calif.
> > >> www.loggly.com
> > >>
> > >> Come join us!
> > >> http://loggly.com/company/careers/
> > >>
> >
>

Re: Transactional writing

Posted by Jun Rao <ju...@gmail.com>.
In a database, if you know the key of the last record that you are writing,
you can do a read to see if your last change is actually made. However, if
you use auto-generated keys by the server and are doing an insert, you
don't even know the key on failure. So, in general, this is an unsolved
problem and needs to be handled at the application level. Most applications
will just do resend and deal with the consequence of potential duplicates.

Thanks,

Jun

On Thu, Oct 25, 2012 at 10:00 PM, Tom Brown <to...@gmail.com> wrote:

> How do other systems deal with that? If I send "commit" to Oracle, but
> my connection dies before I get the ack, is the data committed or not?
>
> What about the other case? If I send "commit" to Oracle, but the
> server dies before I get the ack, is the data committed or not?
>
> In either case, how can I tell?
>
> --Tom
>
> On Fri, Oct 26, 2012 at 12:15 AM, Jun Rao <ju...@gmail.com> wrote:
> > Even if you have transaction support, the same problem exists. If the
> > client died before receiving the ack, it's not sure whether the broker
> > really committed the data or not.
> >
> > To address this issue, the client can save the offset of committed
> messages
> > periodically. On restart from a crash, it first reads all messages after
> > the last saved offset. It then knows whether the last message is
> committed
> > or not and can decide whether the message should be resent or not. This
> > probably only works for a single producer.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <ph...@loggly.com>
> wrote:
> >
> >> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> >> > The closest concept of transaction on the publisher side, that I can
> >> > think of, is using batch of messages in a single call to the
> >> > synchronous producer.
> >> >
> >> > Precisely, you can configure a Kafka producer to use the "sync" mode
> >> > and batch messages that require transactional guarantees in a
> >> > single send() call. That will ensure that either all the messages in
> >> > the batch are sent or none.
> >>
> >> This is an interesting feature -- something I wasn't aware of. Still it
> >> doesn't solve the problem *completely*. As many people realise, it's
> still
> >> possible for the batch of messages to get into Kafka fine, but the ack
> from
> >> Kafka to be lost on its way back to the Producer. In that case the
> Producer
> >> erroneously believes the messages didn't get in, and might re-send them.
> >>
> >> You guys *haven't* solved that issue, right? I believe you write about
> it
> >> on
> >> the Kafka site.
> >>
> >> >
> >> > Thanks,
> >> > Neha
> >> >
> >> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
> wrote:
> >> > > Is there an accepted, or recommended way to make writes to a Kafka
> >> > > queue idempotent, or within a transaction?
> >> > >
> >> > > I can configure my system such that each queue has exactly one
> >> producer.
> >> > >
> >> > > (If there are no accepted/recommended ways, I have a few ideas I
> would
> >> > > like to propose. I would also be willing to implement them if
> needed)
> >> > >
> >> > > Thanks in advance!
> >> > >
> >> > > --Tom
> >>
> >> --
> >> Philip O'Toole
> >>
> >> Senior Developer
> >> Loggly, Inc.
> >> San Francisco, Calif.
> >> www.loggly.com
> >>
> >> Come join us!
> >> http://loggly.com/company/careers/
> >>
>

Re: Transactional writing

Posted by Tom Brown <to...@gmail.com>.
How do other systems deal with that? If I send "commit" to Oracle, but
my connection dies before I get the ack, is the data committed or not?

What about the other case? If I send "commit" to Oracle, but the
server dies before I get the ack, is the data committed or not?

In either case, how can I tell?

--Tom

On Fri, Oct 26, 2012 at 12:15 AM, Jun Rao <ju...@gmail.com> wrote:
> Even if you have transaction support, the same problem exists. If the
> client died before receiving the ack, it's not sure whether the broker
> really committed the data or not.
>
> To address this issue, the client can save the offset of committed messages
> periodically. On restart from a crash, it first reads all messages after
> the last saved offset. It then knows whether the last message is committed
> or not and can decide whether the message should be resent or not. This
> probably only works for a single producer.
>
> Thanks,
>
> Jun
>
> On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <ph...@loggly.com> wrote:
>
>> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
>> > The closest concept of transaction on the publisher side, that I can
>> > think of, is using batch of messages in a single call to the
>> > synchronous producer.
>> >
>> > Precisely, you can configure a Kafka producer to use the "sync" mode
>> > and batch messages that require transactional guarantees in a
>> > single send() call. That will ensure that either all the messages in
>> > the batch are sent or none.
>>
>> This is an interesting feature -- something I wasn't aware of. Still it
>> doesn't solve the problem *completely*. As many people realise, it's still
>> possible for the batch of messages to get into Kafka fine, but the ack from
>> Kafka to be lost on its way back to the Producer. In that case the Producer
>> erroneously believes the messages didn't get in, and might re-send them.
>>
>> You guys *haven't* solved that issue, right? I believe you write about it
>> on
>> the Kafka site.
>>
>> >
>> > Thanks,
>> > Neha
>> >
>> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com> wrote:
>> > > Is there an accepted, or recommended way to make writes to a Kafka
>> > > queue idempotent, or within a transaction?
>> > >
>> > > I can configure my system such that each queue has exactly one
>> producer.
>> > >
>> > > (If there are no accepted/recommended ways, I have a few ideas I would
>> > > like to propose. I would also be willing to implement them if needed)
>> > >
>> > > Thanks in advance!
>> > >
>> > > --Tom
>>
>> --
>> Philip O'Toole
>>
>> Senior Developer
>> Loggly, Inc.
>> San Francisco, Calif.
>> www.loggly.com
>>
>> Come join us!
>> http://loggly.com/company/careers/
>>

Re: Transactional writing

Posted by Jun Rao <ju...@gmail.com>.
Even if you have transaction support, the same problem exists. If the
client died before receiving the ack, it's not sure whether the broker
really committed the data or not.

To address this issue, the client can save the offset of committed messages
periodically. On restart from a crash, it first reads all messages after
the last saved offset. It then knows whether the last message is committed
or not and can decide whether the message should be resent or not. This
probably only works for a single producer.

Thanks,

Jun

On Thu, Oct 25, 2012 at 6:31 PM, Philip O'Toole <ph...@loggly.com> wrote:

> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> > The closest concept of transaction on the publisher side, that I can
> > think of, is using batch of messages in a single call to the
> > synchronous producer.
> >
> > Precisely, you can configure a Kafka producer to use the "sync" mode
> > and batch messages that require transactional guarantees in a
> > single send() call. That will ensure that either all the messages in
> > the batch are sent or none.
>
> This is an interesting feature -- something I wasn't aware of. Still it
> doesn't solve the problem *completely*. As many people realise, it's still
> possible for the batch of messages to get into Kafka fine, but the ack from
> Kafka to be lost on its way back to the Producer. In that case the Producer
> erroneously believes the messages didn't get in, and might re-send them.
>
> You guys *haven't* solved that issue, right? I believe you write about it
> on
> the Kafka site.
>
> >
> > Thanks,
> > Neha
> >
> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com> wrote:
> > > Is there an accepted, or recommended way to make writes to a Kafka
> > > queue idempotent, or within a transaction?
> > >
> > > I can configure my system such that each queue has exactly one
> producer.
> > >
> > > (If there are no accepted/recommended ways, I have a few ideas I would
> > > like to propose. I would also be willing to implement them if needed)
> > >
> > > Thanks in advance!
> > >
> > > --Tom
>
> --
> Philip O'Toole
>
> Senior Developer
> Loggly, Inc.
> San Francisco, Calif.
> www.loggly.com
>
> Come join us!
> http://loggly.com/company/careers/
>

Re: Transactional writing

Posted by Jun Rao <ju...@gmail.com>.
For this particular use case, you can potentially include the message
offset from broker A in the message sent to broker B. If the transaction
fails, you read the last message from broker B and use the included offset
to resume the consumer from broker A. This assumes that there is only a
single client writing to broker B (for a particular partition).

Thanks,

Jun

On Fri, Oct 26, 2012 at 11:31 AM, Guozhang Wang <gu...@cs.cornell.edu> wrote:

> I am also quite interested in this thread, and I have another question
> here to ask about committing consumed messages. For example, if I need a
> program which acts both as a consumer and a producer, and the actions are
> wrapped in a "transaction":
>
> Transaction start:
>
>    Get next message from broker A;
>
>    Do something;
>
>    Send a message to broker B;
>
> Commit.
>
>
> If the transaction aborts after reading the message from broker A, is it
> possible to logically "put the message back" to brokers? I remember that
> Amazon Queue Service use some sort of lease mechanism, which might work for
> this case. But I am afraid that will affect the throughput a lot..
>
> --Guozhang
>
>
> -----Original Message-----
> From: Jay Kreps [mailto:jay.kreps@gmail.com]
> Sent: Friday, October 26, 2012 2:08 PM
> To: kafka-users@incubator.apache.org
> Subject: Re: Transactional writing
>
> This is an important feature and I am interested in helping out in the
> design and implementation, though I am working on 0.8 features for the next
> month so I may not be of too much use. I have thought a little bit about
> this, but I am not yet sure of the best approach.
>
> Here is a specific use case I think is important to address: consider a
> case where you are doing processing of one or more streams and producing an
> output stream. This processing may involve some kind of local state (say
> counters or other local aggregation intermediate state). This is a common
> scenario. The problem is to give reasonable semantics to this computation
> in the presence of failures. The processor effectively has a
> position/offset in each of its input streams as well as whatever local
> state. The problem is that if this process fails it needs to restore to a
> state that matches the last produced messages. There are several solutions
> to this problem. One is to make the output somehow idempotent, this will
> solve some cases but is not a general solution as many things cannot be
> made idempotent easily.
>
> I think the two proposals you give outline a couple of basic approaches:
> 1. Store the messages on the server somewhere but don't add them to the log
> until the commit call
> 2. Store the messages in the log but don't make them available to the
> consumer until the commit call
> Another option you didn't mention:
>
> I can give several subtleties to these approaches.
>
> One advantage of the second approach is that messages are in the log and
> can be available for reading or not. This makes it possible to support a
> kind of "dirty read" that allows the consumer to specify whether they want
> to immediately see all messages with low latency but potentially see
> uncommitted messages or only see committed messages.
>
> The problem with the second approach at least in the way you describe it is
> that you have to lock the log until the commit occurs otherwise you can't
> roll back (because otherwise someone else may have appended their own
> messages and you can't truncate the log). This would have all the problems
> of remote locks. I think this might be a deal-breaker.
>
> Another variation on the second approach would be the following: have each
> producer maintain an id and generation number. Keep a schedule of valid
> offset/id/generation numbers on the broker and only hand these out. This
> solution would support non-blocking multi-writer appends but requires more
> participation from the producer (i.e. getting a generation number and id).
>
> Cheers,
>
> -Jay
>
> On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <to...@gmail.com> wrote:
>
> > I have come up with two different possibilities, both with different
> > trade-offs.
> >
> > The first would be to support "true" transactions by writing
> > transactional data into a temporary file and then copy it directly to
> > the end of the partition when the commit command is created. The
> > upside to this approach is that individual transactions can be larger
> > than a single batch, and more than one producer could conduct
> > transactions at once. The downside is the extra IO involved in writing
> > it and reading it from disk an extra time.
> >
> > The second would be to allow any number of messages to be appended to
> > a topic, but not move the "end of topic" offset until the commit was
> > received. If a rollback was received, or the producer timed out, the
> > partition could be truncated at the most recently recognized "end of
> > topic" offset. The upside is that there is very little extra IO (only
> > to store the official "end of topic" metadata), and it seems like it
> > should be easy to implement. The downside is that this the
> > "transaction" feature is incompatible with anything but a single
> > producer per partition.
> >
> > I am interested in your thoughts on these.
> >
> > --Tom
> >
> > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <ph...@loggly.com>
> wrote:
> > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> > >> The closest concept of transaction on the publisher side, that I can
> > >> think of, is using batch of messages in a single call to the
> > >> synchronous producer.
> > >>
> > >> Precisely, you can configure a Kafka producer to use the "sync" mode
> > >> and batch messages that require transactional guarantees in a
> > >> single send() call. That will ensure that either all the messages in
> > >> the batch are sent or none.
> > >
> > > This is an interesting feature -- something I wasn't aware of. Still it
> > > doesn't solve the problem *completely*. As many people realise, it's
> > still
> > > possible for the batch of messages to get into Kafka fine, but the ack
> > from
> > > Kafka to be lost on its way back to the Producer. In that case the
> > Producer
> > > erroneously believes the messages didn't get in, and might re-send
> them.
> > >
> > > You guys *haven't* solved that issue, right? I believe you write about
> > it on
> > > the Kafka site.
> > >
> > >>
> > >> Thanks,
> > >> Neha
> > >>
> > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
> > wrote:
> > >> > Is there an accepted, or recommended way to make writes to a Kafka
> > >> > queue idempotent, or within a transaction?
> > >> >
> > >> > I can configure my system such that each queue has exactly one
> > producer.
> > >> >
> > >> > (If there are no accepted/recommended ways, I have a few ideas I
> would
> > >> > like to propose. I would also be willing to implement them if
> needed)
> > >> >
> > >> > Thanks in advance!
> > >> >
> > >> > --Tom
> > >
> > > --
> > > Philip O'Toole
> > >
> > > Senior Developer
> > > Loggly, Inc.
> > > San Francisco, Calif.
> > > www.loggly.com
> > >
> > > Come join us!
> > > http://loggly.com/company/careers/
> >
>

RE: Transactional writing

Posted by Guozhang Wang <gu...@cs.cornell.edu>.
I am also quite interested in this thread, and I have another question here to ask about committing consumed messages. For example, if I need a program which acts both as a consumer and a producer, and the actions are wrapped in a "transaction":

Transaction start:

   Get next message from broker A;

   Do something;

   Send a message to broker B;

Commit.


If the transaction aborts after reading the message from broker A, is it possible to logically "put the message back" to brokers? I remember that Amazon Queue Service use some sort of lease mechanism, which might work for this case. But I am afraid that will affect the throughput a lot..

--Guozhang


-----Original Message-----
From: Jay Kreps [mailto:jay.kreps@gmail.com] 
Sent: Friday, October 26, 2012 2:08 PM
To: kafka-users@incubator.apache.org
Subject: Re: Transactional writing

This is an important feature and I am interested in helping out in the
design and implementation, though I am working on 0.8 features for the next
month so I may not be of too much use. I have thought a little bit about
this, but I am not yet sure of the best approach.

Here is a specific use case I think is important to address: consider a
case where you are doing processing of one or more streams and producing an
output stream. This processing may involve some kind of local state (say
counters or other local aggregation intermediate state). This is a common
scenario. The problem is to give reasonable semantics to this computation
in the presence of failures. The processor effectively has a
position/offset in each of its input streams as well as whatever local
state. The problem is that if this process fails it needs to restore to a
state that matches the last produced messages. There are several solutions
to this problem. One is to make the output somehow idempotent, this will
solve some cases but is not a general solution as many things cannot be
made idempotent easily.

I think the two proposals you give outline a couple of basic approaches:
1. Store the messages on the server somewhere but don't add them to the log
until the commit call
2. Store the messages in the log but don't make them available to the
consumer until the commit call
Another option you didn't mention:

I can give several subtleties to these approaches.

One advantage of the second approach is that messages are in the log and
can be available for reading or not. This makes it possible to support a
kind of "dirty read" that allows the consumer to specify whether they want
to immediately see all messages with low latency but potentially see
uncommitted messages or only see committed messages.

The problem with the second approach at least in the way you describe it is
that you have to lock the log until the commit occurs otherwise you can't
roll back (because otherwise someone else may have appended their own
messages and you can't truncate the log). This would have all the problems
of remote locks. I think this might be a deal-breaker.

Another variation on the second approach would be the following: have each
producer maintain an id and generation number. Keep a schedule of valid
offset/id/generation numbers on the broker and only hand these out. This
solution would support non-blocking multi-writer appends but requires more
participation from the producer (i.e. getting a generation number and id).

Cheers,

-Jay

On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <to...@gmail.com> wrote:

> I have come up with two different possibilities, both with different
> trade-offs.
>
> The first would be to support "true" transactions by writing
> transactional data into a temporary file and then copy it directly to
> the end of the partition when the commit command is created. The
> upside to this approach is that individual transactions can be larger
> than a single batch, and more than one producer could conduct
> transactions at once. The downside is the extra IO involved in writing
> it and reading it from disk an extra time.
>
> The second would be to allow any number of messages to be appended to
> a topic, but not move the "end of topic" offset until the commit was
> received. If a rollback was received, or the producer timed out, the
> partition could be truncated at the most recently recognized "end of
> topic" offset. The upside is that there is very little extra IO (only
> to store the official "end of topic" metadata), and it seems like it
> should be easy to implement. The downside is that this the
> "transaction" feature is incompatible with anything but a single
> producer per partition.
>
> I am interested in your thoughts on these.
>
> --Tom
>
> On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <ph...@loggly.com> wrote:
> > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> >> The closest concept of transaction on the publisher side, that I can
> >> think of, is using batch of messages in a single call to the
> >> synchronous producer.
> >>
> >> Precisely, you can configure a Kafka producer to use the "sync" mode
> >> and batch messages that require transactional guarantees in a
> >> single send() call. That will ensure that either all the messages in
> >> the batch are sent or none.
> >
> > This is an interesting feature -- something I wasn't aware of. Still it
> > doesn't solve the problem *completely*. As many people realise, it's
> still
> > possible for the batch of messages to get into Kafka fine, but the ack
> from
> > Kafka to be lost on its way back to the Producer. In that case the
> Producer
> > erroneously believes the messages didn't get in, and might re-send them.
> >
> > You guys *haven't* solved that issue, right? I believe you write about
> it on
> > the Kafka site.
> >
> >>
> >> Thanks,
> >> Neha
> >>
> >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
> wrote:
> >> > Is there an accepted, or recommended way to make writes to a Kafka
> >> > queue idempotent, or within a transaction?
> >> >
> >> > I can configure my system such that each queue has exactly one
> producer.
> >> >
> >> > (If there are no accepted/recommended ways, I have a few ideas I would
> >> > like to propose. I would also be willing to implement them if needed)
> >> >
> >> > Thanks in advance!
> >> >
> >> > --Tom
> >
> > --
> > Philip O'Toole
> >
> > Senior Developer
> > Loggly, Inc.
> > San Francisco, Calif.
> > www.loggly.com
> >
> > Come join us!
> > http://loggly.com/company/careers/
>

Re: Transactional writing

Posted by Jonathan Hodges <ho...@gmail.com>.
Awesome!  Thanks for confirmation and continued great work on Kafka!


On Thu, Mar 28, 2013 at 9:22 AM, Jun Rao <ju...@gmail.com> wrote:

> Jonathan,
>
> With a single writer, the producer can achieve exact once write. If a send
> request fails, the producer first checks the end of the log to see if the
> previous write succeeded or not. The producer will only resend if the
> previous write fails.
>
> To do this, the producer needs the offset of appended messages. In 0.8,
> such offsets are not returned in our high level producer API yet. We plan
> to extend our producer API post 0.8 to expose this information.
>
> Thanks,
>
> Jun
>
> On Wed, Mar 27, 2013 at 2:41 PM, Jonathan Hodges <ho...@gmail.com>
> wrote:
>
> > I know this is a really old thread, but it looked like the only pertinent
> > one that came up when searching for ‘exactly once’ in the archives.  I
> just
> > want to confirm my understanding of the 0.8 version in that it still
> > doesn’t completely support exactly once semantics.  With the producer
> > configured in sync mode and quorum commits there are still some edge case
> > failure modes where the producer won’t receive the ack and resends the
> > message(s).  I think I read that the consumers don’t see uncommitted
> > messages in the log, but I don’t think that addresses this producer case.
> > Please correct me if I am missing something here.
> >
> >
> > Don’t get me wrong we are very thankful for the 0.8 features.  It offers
> by
> > far the best message delivery guarantees out of the products we evaluated
> > like Rabbit and ActiveMQ.  We attempt to make are downstream consumer
> > processes idempotent to mitigate this edge case, but it isn’t always
> > feasible.  Also the suggestion by Milind in this thread of using Storm
> for
> > exactly once guarantees makes a lot of sense.  Trident State seems to
> > address this very issue (
> > https://github.com/nathanmarz/storm/wiki/Trident-state) so we could just
> > have it mediate our topics that required exactly once.
> >
> >
> > -Jonathan
> >
> >
> >
> > On Sat, Nov 3, 2012 at 1:53 PM, Milind Parikh <milindparikh@gmail.com
> > >wrote:
> >
> > > Why wouldn't the storm approach provide semantics of exactly once
> > > delivery? https://github.com/nathanmarz/storm
> > >
> > > Nathan actually credits the Kafka_devs for the basic idea of
> transaction
> > > persisting in one of his talks.
> > >
> > > Regards
> > > Milind
> > >
> > > On Nov 3, 2012 11:51 AM, "Rohit Prasad" <ro...@gmail.com>
> > wrote:
> > >
> > > > I agree that this approach only prevents duplicate messages to
> > partition
> > > > from the Producer side. There needs to be a similar approach on the
> > > > consumer side too. Using Zk can be one solution, or other non-ZK
> > > > approaches.
> > > >
> > > > Even if Consumer reads none or all messages of a transaction. But
> that
> > > does
> > > > not solve the transaction problem yet. Because the
> business/application
> > > > logic inside the Consumer thread may execute partially and fail. So
> it
> > > > becomes tricky to decide the point when you want to say that you have
> > > > "consumed" the message and increase consumption offset. If your
> > consumer
> > > > thread is saving some value  into DB/HDFS/etc, ideally you want this
> > save
> > > > operation and consumption offset to be incremented atomically. Thats
> > why
> > > it
> > > > boils down to Application logic implementing transactions and dealing
> > > with
> > > > duplicates.
> > > > Maybe a journalling or redo log approach on Consumer side can help
> > build
> > > > such a system.
> > > >
> > > > It will be nice if eventually kafka can be a transport which provides
> > > > "exactly once" semantics for message delivery. Then consumer threads
> > can
> > > be
> > > > sure that they receive messages once, and can build appln logic on
> top
> > of
> > > > that.
> > > >
> > > > I have a use case similar to what Jay mentioned in a previous mail. I
> > > want
> > > > to do aggregation but want the aggregated data to be correct,
> possible
> > > > avoiding duplicates incase of failures/crashes.
> > > >
> > > >
> > > >
> > > > On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <to...@gmail.com>
> > wrote:
> > > >
> > > > > That approach allows a producer to prevent duplicate messages to
> the
> > > > > partition, but what about the consumer? In my case, I don't want
> the
> > > > > consumer to be able to read any of the messages unless it can read
> > all
> > > > > of the messages from a transaction.
> > > > >
> > > > > I also like the idea of there being multiple types of Kafka
> > > > > transaction, though, just to accommodate different performance,
> > > > > reliability, and consumption patterns. Of course, the added
> > complexity
> > > > > of that might just sink the whole thing.
> > > > >
> > > > > --Tom
> > > > >
> > > > > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <
> > rohit.prasad15@gmail.com
> > > >
> > > > > wrote:
> > > > > > Getting transactional support is quite hard problem. There will
> > > always
> > > > be
> > > > > > corner cases where the solution will not work, unless you want to
> > go
> > > > down
> > > > > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's
> > > > > > performance. It is best to reconcile data and deal with duplicate
> > > > > messages
> > > > > > in Application layer. Having said that it would be amazing if we
> > can
> > > > > build
> > > > > > "at most once" semantics in Kafka!!
> > > > > >
> > > > > > Regarding above approaches,
> > > > > > The producer will always have a doubt if its commit went through.
> > > i.e.
> > > > if
> > > > > > the ack for "commit" is not received by the producer. Or If
> > producer
> > > > dies
> > > > > > immediately after calling the commit. When it is restarted how
> does
> > > it
> > > > > know
> > > > > > if last operation went through?
> > > > > >
> > > > > > I suggest the following -
> > > > > > 1. Producer should attach a timestamp at the beginning of each
> > > message
> > > > > and
> > > > > > send it to Server.
> > > > > > 2. On restarts/timeouts/re-connections, the producer should first
> > > read
> > > > > the
> > > > > > last committed message from the leader of the partition.
> > > > > > 3. From timestamp, it can know how many messages went through
> > before
> > > it
> > > > > > died (or connection was broken). And it can infer how many
> messages
> > > to
> > > > > > replay.
> > > > > >
> > > > > > The above approach can be used with existing Kafka libraries
> since
> > > you
> > > > > can
> > > > > > have a producer and consumer thread together in an application to
> > > > > implement
> > > > > > this logic. Or someone can take the initiative to write a
> > > Transactional
> > > > > > producer (which internally has both producer and a consumer to
> read
> > > > last
> > > > > > committed message.) I will be developing one for kafka 0.8 in
> c++.
> > > > > >
> > > > > > The above approach will work even if you batch messages for a
> > single
> > > > > > partition.
> > > > > > The above approach will work only if a single producer is writing
> > to
> > > a
> > > > > > partition. I want hear opinions about the above approach. I sure
> > > there
> > > > > can
> > > > > > be corner-cases where it may break.
> > > > > >
> > > > > > If there are multiple producers to a partition, then some book
> > > keeping
> > > > on
> > > > > > server side with regards to last msg committed from a
> "co-relation
> > > id"
> > > > > (to
> > > > > > identify unique producer) may be needed.
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > > Rohit
> > > > > >
> > > > > >
> > > > > > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <ju...@gmail.com>
> > wrote:
> > > > > >
> > > > > >> If you use Kafka just as a redo log, you can't undo anything
> > that's
> > > > > written
> > > > > >> to the log. Write-ahead logs in typical database systems are
> both
> > > redo
> > > > > and
> > > > > >> undo logs. Transaction commits and rollbacks are implemented on
> > top
> > > of
> > > > > the
> > > > > >> logs. However, general-purpose write-ahead logs for transactions
> > are
> > > > > much
> > > > > >> more complicated.
> > > > > >>
> > > > > >> Thanks,
> > > > > >>
> > > > > >> Jun
> > > > > >>
> > > > > >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <
> jay.kreps@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > This is an important feature and I am interested in helping
> out
> > in
> > > > the
> > > > > >> > design and implementation, though I am working on 0.8 features
> > for
> > > > the
> > > > > >> next
> > > > > >> > month so I may not be of too much use. I have thought a little
> > bit
> > > > > about
> > > > > >> > this, but I am not yet sure of the best approach.
> > > > > >> >
> > > > > >> > Here is a specific use case I think is important to address:
> > > > consider
> > > > > a
> > > > > >> > case where you are doing processing of one or more streams and
> > > > > producing
> > > > > >> an
> > > > > >> > output stream. This processing may involve some kind of local
> > > state
> > > > > (say
> > > > > >> > counters or other local aggregation intermediate state). This
> > is a
> > > > > common
> > > > > >> > scenario. The problem is to give reasonable semantics to this
> > > > > computation
> > > > > >> > in the presence of failures. The processor effectively has a
> > > > > >> > position/offset in each of its input streams as well as
> whatever
> > > > local
> > > > > >> > state. The problem is that if this process fails it needs to
> > > restore
> > > > > to a
> > > > > >> > state that matches the last produced messages. There are
> several
> > > > > >> solutions
> > > > > >> > to this problem. One is to make the output somehow idempotent,
> > > this
> > > > > will
> > > > > >> > solve some cases but is not a general solution as many things
> > > cannot
> > > > > be
> > > > > >> > made idempotent easily.
> > > > > >> >
> > > > > >> > I think the two proposals you give outline a couple of basic
> > > > > approaches:
> > > > > >> > 1. Store the messages on the server somewhere but don't add
> them
> > > to
> > > > > the
> > > > > >> log
> > > > > >> > until the commit call
> > > > > >> > 2. Store the messages in the log but don't make them available
> > to
> > > > the
> > > > > >> > consumer until the commit call
> > > > > >> > Another option you didn't mention:
> > > > > >> >
> > > > > >> > I can give several subtleties to these approaches.
> > > > > >> >
> > > > > >> > One advantage of the second approach is that messages are in
> the
> > > log
> > > > > and
> > > > > >> > can be available for reading or not. This makes it possible to
> > > > > support a
> > > > > >> > kind of "dirty read" that allows the consumer to specify
> whether
> > > > they
> > > > > >> want
> > > > > >> > to immediately see all messages with low latency but
> potentially
> > > see
> > > > > >> > uncommitted messages or only see committed messages.
> > > > > >> >
> > > > > >> > The problem with the second approach at least in the way you
> > > > describe
> > > > > it
> > > > > >> is
> > > > > >> > that you have to lock the log until the commit occurs
> otherwise
> > > you
> > > > > can't
> > > > > >> > roll back (because otherwise someone else may have appended
> > their
> > > > own
> > > > > >> > messages and you can't truncate the log). This would have all
> > the
> > > > > >> problems
> > > > > >> > of remote locks. I think this might be a deal-breaker.
> > > > > >> >
> > > > > >> > Another variation on the second approach would be the
> following:
> > > > have
> > > > > >> each
> > > > > >> > producer maintain an id and generation number. Keep a schedule
> > of
> > > > > valid
> > > > > >> > offset/id/generation numbers on the broker and only hand these
> > > out.
> > > > > This
> > > > > >> > solution would support non-blocking multi-writer appends but
> > > > requires
> > > > > >> more
> > > > > >> > participation from the producer (i.e. getting a generation
> > number
> > > > and
> > > > > >> id).
> > > > > >> >
> > > > > >> > Cheers,
> > > > > >> >
> > > > > >> > -Jay
> > > > > >> >
> > > > > >> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <
> > tombrown52@gmail.com>
> > > > > wrote:
> > > > > >> >
> > > > > >> > > I have come up with two different possibilities, both with
> > > > different
> > > > > >> > > trade-offs.
> > > > > >> > >
> > > > > >> > > The first would be to support "true" transactions by writing
> > > > > >> > > transactional data into a temporary file and then copy it
> > > directly
> > > > > to
> > > > > >> > > the end of the partition when the commit command is created.
> > The
> > > > > >> > > upside to this approach is that individual transactions can
> be
> > > > > larger
> > > > > >> > > than a single batch, and more than one producer could
> conduct
> > > > > >> > > transactions at once. The downside is the extra IO involved
> in
> > > > > writing
> > > > > >> > > it and reading it from disk an extra time.
> > > > > >> > >
> > > > > >> > > The second would be to allow any number of messages to be
> > > appended
> > > > > to
> > > > > >> > > a topic, but not move the "end of topic" offset until the
> > commit
> > > > was
> > > > > >> > > received. If a rollback was received, or the producer timed
> > out,
> > > > the
> > > > > >> > > partition could be truncated at the most recently recognized
> > > "end
> > > > of
> > > > > >> > > topic" offset. The upside is that there is very little extra
> > IO
> > > > > (only
> > > > > >> > > to store the official "end of topic" metadata), and it seems
> > > like
> > > > it
> > > > > >> > > should be easy to implement. The downside is that this the
> > > > > >> > > "transaction" feature is incompatible with anything but a
> > single
> > > > > >> > > producer per partition.
> > > > > >> > >
> > > > > >> > > I am interested in your thoughts on these.
> > > > > >> > >
> > > > > >> > > --Tom
> > > > > >> > >
> > > > > >> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <
> > > > philip@loggly.com>
> > > > > >> > wrote:
> > > > > >> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede
> > wrote:
> > > > > >> > > >> The closest concept of transaction on the publisher side,
> > > that
> > > > I
> > > > > can
> > > > > >> > > >> think of, is using batch of messages in a single call to
> > the
> > > > > >> > > >> synchronous producer.
> > > > > >> > > >>
> > > > > >> > > >> Precisely, you can configure a Kafka producer to use the
> > > "sync"
> > > > > mode
> > > > > >> > > >> and batch messages that require transactional guarantees
> > in a
> > > > > >> > > >> single send() call. That will ensure that either all the
> > > > > messages in
> > > > > >> > > >> the batch are sent or none.
> > > > > >> > > >
> > > > > >> > > > This is an interesting feature -- something I wasn't aware
> > of.
> > > > > Still
> > > > > >> it
> > > > > >> > > > doesn't solve the problem *completely*. As many people
> > > realise,
> > > > > it's
> > > > > >> > > still
> > > > > >> > > > possible for the batch of messages to get into Kafka fine,
> > but
> > > > the
> > > > > >> ack
> > > > > >> > > from
> > > > > >> > > > Kafka to be lost on its way back to the Producer. In that
> > case
> > > > the
> > > > > >> > > Producer
> > > > > >> > > > erroneously believes the messages didn't get in, and might
> > > > re-send
> > > > > >> > them.
> > > > > >> > > >
> > > > > >> > > > You guys *haven't* solved that issue, right? I believe you
> > > write
> > > > > >> about
> > > > > >> > > it on
> > > > > >> > > > the Kafka site.
> > > > > >> > > >
> > > > > >> > > >>
> > > > > >> > > >> Thanks,
> > > > > >> > > >> Neha
> > > > > >> > > >>
> > > > > >> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <
> > > > tombrown52@gmail.com
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > > >> > Is there an accepted, or recommended way to make writes
> > to
> > > a
> > > > > Kafka
> > > > > >> > > >> > queue idempotent, or within a transaction?
> > > > > >> > > >> >
> > > > > >> > > >> > I can configure my system such that each queue has
> > exactly
> > > > one
> > > > > >> > > producer.
> > > > > >> > > >> >
> > > > > >> > > >> > (If there are no accepted/recommended ways, I have a
> few
> > > > ideas
> > > > > I
> > > > > >> > would
> > > > > >> > > >> > like to propose. I would also be willing to implement
> > them
> > > if
> > > > > >> > needed)
> > > > > >> > > >> >
> > > > > >> > > >> > Thanks in advance!
> > > > > >> > > >> >
> > > > > >> > > >> > --Tom
> > > > > >> > > >
> > > > > >> > > > --
> > > > > >> > > > Philip O'Toole
> > > > > >> > > >
> > > > > >> > > > Senior Developer
> > > > > >> > > > Loggly, Inc.
> > > > > >> > > > San Francisco, Calif.
> > > > > >> > > > www.loggly.com
> > > > > >> > > >
> > > > > >> > > > Come join us!
> > > > > >> > > > http://loggly.com/company/careers/
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: Transactional writing

Posted by Jun Rao <ju...@gmail.com>.
Jonathan,

With a single writer, the producer can achieve exact once write. If a send
request fails, the producer first checks the end of the log to see if the
previous write succeeded or not. The producer will only resend if the
previous write fails.

To do this, the producer needs the offset of appended messages. In 0.8,
such offsets are not returned in our high level producer API yet. We plan
to extend our producer API post 0.8 to expose this information.

Thanks,

Jun

On Wed, Mar 27, 2013 at 2:41 PM, Jonathan Hodges <ho...@gmail.com> wrote:

> I know this is a really old thread, but it looked like the only pertinent
> one that came up when searching for ‘exactly once’ in the archives.  I just
> want to confirm my understanding of the 0.8 version in that it still
> doesn’t completely support exactly once semantics.  With the producer
> configured in sync mode and quorum commits there are still some edge case
> failure modes where the producer won’t receive the ack and resends the
> message(s).  I think I read that the consumers don’t see uncommitted
> messages in the log, but I don’t think that addresses this producer case.
> Please correct me if I am missing something here.
>
>
> Don’t get me wrong we are very thankful for the 0.8 features.  It offers by
> far the best message delivery guarantees out of the products we evaluated
> like Rabbit and ActiveMQ.  We attempt to make are downstream consumer
> processes idempotent to mitigate this edge case, but it isn’t always
> feasible.  Also the suggestion by Milind in this thread of using Storm for
> exactly once guarantees makes a lot of sense.  Trident State seems to
> address this very issue (
> https://github.com/nathanmarz/storm/wiki/Trident-state) so we could just
> have it mediate our topics that required exactly once.
>
>
> -Jonathan
>
>
>
> On Sat, Nov 3, 2012 at 1:53 PM, Milind Parikh <milindparikh@gmail.com
> >wrote:
>
> > Why wouldn't the storm approach provide semantics of exactly once
> > delivery? https://github.com/nathanmarz/storm
> >
> > Nathan actually credits the Kafka_devs for the basic idea of transaction
> > persisting in one of his talks.
> >
> > Regards
> > Milind
> >
> > On Nov 3, 2012 11:51 AM, "Rohit Prasad" <ro...@gmail.com>
> wrote:
> >
> > > I agree that this approach only prevents duplicate messages to
> partition
> > > from the Producer side. There needs to be a similar approach on the
> > > consumer side too. Using Zk can be one solution, or other non-ZK
> > > approaches.
> > >
> > > Even if Consumer reads none or all messages of a transaction. But that
> > does
> > > not solve the transaction problem yet. Because the business/application
> > > logic inside the Consumer thread may execute partially and fail. So it
> > > becomes tricky to decide the point when you want to say that you have
> > > "consumed" the message and increase consumption offset. If your
> consumer
> > > thread is saving some value  into DB/HDFS/etc, ideally you want this
> save
> > > operation and consumption offset to be incremented atomically. Thats
> why
> > it
> > > boils down to Application logic implementing transactions and dealing
> > with
> > > duplicates.
> > > Maybe a journalling or redo log approach on Consumer side can help
> build
> > > such a system.
> > >
> > > It will be nice if eventually kafka can be a transport which provides
> > > "exactly once" semantics for message delivery. Then consumer threads
> can
> > be
> > > sure that they receive messages once, and can build appln logic on top
> of
> > > that.
> > >
> > > I have a use case similar to what Jay mentioned in a previous mail. I
> > want
> > > to do aggregation but want the aggregated data to be correct, possible
> > > avoiding duplicates incase of failures/crashes.
> > >
> > >
> > >
> > > On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <to...@gmail.com>
> wrote:
> > >
> > > > That approach allows a producer to prevent duplicate messages to the
> > > > partition, but what about the consumer? In my case, I don't want the
> > > > consumer to be able to read any of the messages unless it can read
> all
> > > > of the messages from a transaction.
> > > >
> > > > I also like the idea of there being multiple types of Kafka
> > > > transaction, though, just to accommodate different performance,
> > > > reliability, and consumption patterns. Of course, the added
> complexity
> > > > of that might just sink the whole thing.
> > > >
> > > > --Tom
> > > >
> > > > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <
> rohit.prasad15@gmail.com
> > >
> > > > wrote:
> > > > > Getting transactional support is quite hard problem. There will
> > always
> > > be
> > > > > corner cases where the solution will not work, unless you want to
> go
> > > down
> > > > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's
> > > > > performance. It is best to reconcile data and deal with duplicate
> > > > messages
> > > > > in Application layer. Having said that it would be amazing if we
> can
> > > > build
> > > > > "at most once" semantics in Kafka!!
> > > > >
> > > > > Regarding above approaches,
> > > > > The producer will always have a doubt if its commit went through.
> > i.e.
> > > if
> > > > > the ack for "commit" is not received by the producer. Or If
> producer
> > > dies
> > > > > immediately after calling the commit. When it is restarted how does
> > it
> > > > know
> > > > > if last operation went through?
> > > > >
> > > > > I suggest the following -
> > > > > 1. Producer should attach a timestamp at the beginning of each
> > message
> > > > and
> > > > > send it to Server.
> > > > > 2. On restarts/timeouts/re-connections, the producer should first
> > read
> > > > the
> > > > > last committed message from the leader of the partition.
> > > > > 3. From timestamp, it can know how many messages went through
> before
> > it
> > > > > died (or connection was broken). And it can infer how many messages
> > to
> > > > > replay.
> > > > >
> > > > > The above approach can be used with existing Kafka libraries since
> > you
> > > > can
> > > > > have a producer and consumer thread together in an application to
> > > > implement
> > > > > this logic. Or someone can take the initiative to write a
> > Transactional
> > > > > producer (which internally has both producer and a consumer to read
> > > last
> > > > > committed message.) I will be developing one for kafka 0.8 in c++.
> > > > >
> > > > > The above approach will work even if you batch messages for a
> single
> > > > > partition.
> > > > > The above approach will work only if a single producer is writing
> to
> > a
> > > > > partition. I want hear opinions about the above approach. I sure
> > there
> > > > can
> > > > > be corner-cases where it may break.
> > > > >
> > > > > If there are multiple producers to a partition, then some book
> > keeping
> > > on
> > > > > server side with regards to last msg committed from a "co-relation
> > id"
> > > > (to
> > > > > identify unique producer) may be needed.
> > > > >
> > > > >
> > > > > Regards,
> > > > > Rohit
> > > > >
> > > > >
> > > > > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <ju...@gmail.com>
> wrote:
> > > > >
> > > > >> If you use Kafka just as a redo log, you can't undo anything
> that's
> > > > written
> > > > >> to the log. Write-ahead logs in typical database systems are both
> > redo
> > > > and
> > > > >> undo logs. Transaction commits and rollbacks are implemented on
> top
> > of
> > > > the
> > > > >> logs. However, general-purpose write-ahead logs for transactions
> are
> > > > much
> > > > >> more complicated.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <ja...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> > This is an important feature and I am interested in helping out
> in
> > > the
> > > > >> > design and implementation, though I am working on 0.8 features
> for
> > > the
> > > > >> next
> > > > >> > month so I may not be of too much use. I have thought a little
> bit
> > > > about
> > > > >> > this, but I am not yet sure of the best approach.
> > > > >> >
> > > > >> > Here is a specific use case I think is important to address:
> > > consider
> > > > a
> > > > >> > case where you are doing processing of one or more streams and
> > > > producing
> > > > >> an
> > > > >> > output stream. This processing may involve some kind of local
> > state
> > > > (say
> > > > >> > counters or other local aggregation intermediate state). This
> is a
> > > > common
> > > > >> > scenario. The problem is to give reasonable semantics to this
> > > > computation
> > > > >> > in the presence of failures. The processor effectively has a
> > > > >> > position/offset in each of its input streams as well as whatever
> > > local
> > > > >> > state. The problem is that if this process fails it needs to
> > restore
> > > > to a
> > > > >> > state that matches the last produced messages. There are several
> > > > >> solutions
> > > > >> > to this problem. One is to make the output somehow idempotent,
> > this
> > > > will
> > > > >> > solve some cases but is not a general solution as many things
> > cannot
> > > > be
> > > > >> > made idempotent easily.
> > > > >> >
> > > > >> > I think the two proposals you give outline a couple of basic
> > > > approaches:
> > > > >> > 1. Store the messages on the server somewhere but don't add them
> > to
> > > > the
> > > > >> log
> > > > >> > until the commit call
> > > > >> > 2. Store the messages in the log but don't make them available
> to
> > > the
> > > > >> > consumer until the commit call
> > > > >> > Another option you didn't mention:
> > > > >> >
> > > > >> > I can give several subtleties to these approaches.
> > > > >> >
> > > > >> > One advantage of the second approach is that messages are in the
> > log
> > > > and
> > > > >> > can be available for reading or not. This makes it possible to
> > > > support a
> > > > >> > kind of "dirty read" that allows the consumer to specify whether
> > > they
> > > > >> want
> > > > >> > to immediately see all messages with low latency but potentially
> > see
> > > > >> > uncommitted messages or only see committed messages.
> > > > >> >
> > > > >> > The problem with the second approach at least in the way you
> > > describe
> > > > it
> > > > >> is
> > > > >> > that you have to lock the log until the commit occurs otherwise
> > you
> > > > can't
> > > > >> > roll back (because otherwise someone else may have appended
> their
> > > own
> > > > >> > messages and you can't truncate the log). This would have all
> the
> > > > >> problems
> > > > >> > of remote locks. I think this might be a deal-breaker.
> > > > >> >
> > > > >> > Another variation on the second approach would be the following:
> > > have
> > > > >> each
> > > > >> > producer maintain an id and generation number. Keep a schedule
> of
> > > > valid
> > > > >> > offset/id/generation numbers on the broker and only hand these
> > out.
> > > > This
> > > > >> > solution would support non-blocking multi-writer appends but
> > > requires
> > > > >> more
> > > > >> > participation from the producer (i.e. getting a generation
> number
> > > and
> > > > >> id).
> > > > >> >
> > > > >> > Cheers,
> > > > >> >
> > > > >> > -Jay
> > > > >> >
> > > > >> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <
> tombrown52@gmail.com>
> > > > wrote:
> > > > >> >
> > > > >> > > I have come up with two different possibilities, both with
> > > different
> > > > >> > > trade-offs.
> > > > >> > >
> > > > >> > > The first would be to support "true" transactions by writing
> > > > >> > > transactional data into a temporary file and then copy it
> > directly
> > > > to
> > > > >> > > the end of the partition when the commit command is created.
> The
> > > > >> > > upside to this approach is that individual transactions can be
> > > > larger
> > > > >> > > than a single batch, and more than one producer could conduct
> > > > >> > > transactions at once. The downside is the extra IO involved in
> > > > writing
> > > > >> > > it and reading it from disk an extra time.
> > > > >> > >
> > > > >> > > The second would be to allow any number of messages to be
> > appended
> > > > to
> > > > >> > > a topic, but not move the "end of topic" offset until the
> commit
> > > was
> > > > >> > > received. If a rollback was received, or the producer timed
> out,
> > > the
> > > > >> > > partition could be truncated at the most recently recognized
> > "end
> > > of
> > > > >> > > topic" offset. The upside is that there is very little extra
> IO
> > > > (only
> > > > >> > > to store the official "end of topic" metadata), and it seems
> > like
> > > it
> > > > >> > > should be easy to implement. The downside is that this the
> > > > >> > > "transaction" feature is incompatible with anything but a
> single
> > > > >> > > producer per partition.
> > > > >> > >
> > > > >> > > I am interested in your thoughts on these.
> > > > >> > >
> > > > >> > > --Tom
> > > > >> > >
> > > > >> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <
> > > philip@loggly.com>
> > > > >> > wrote:
> > > > >> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede
> wrote:
> > > > >> > > >> The closest concept of transaction on the publisher side,
> > that
> > > I
> > > > can
> > > > >> > > >> think of, is using batch of messages in a single call to
> the
> > > > >> > > >> synchronous producer.
> > > > >> > > >>
> > > > >> > > >> Precisely, you can configure a Kafka producer to use the
> > "sync"
> > > > mode
> > > > >> > > >> and batch messages that require transactional guarantees
> in a
> > > > >> > > >> single send() call. That will ensure that either all the
> > > > messages in
> > > > >> > > >> the batch are sent or none.
> > > > >> > > >
> > > > >> > > > This is an interesting feature -- something I wasn't aware
> of.
> > > > Still
> > > > >> it
> > > > >> > > > doesn't solve the problem *completely*. As many people
> > realise,
> > > > it's
> > > > >> > > still
> > > > >> > > > possible for the batch of messages to get into Kafka fine,
> but
> > > the
> > > > >> ack
> > > > >> > > from
> > > > >> > > > Kafka to be lost on its way back to the Producer. In that
> case
> > > the
> > > > >> > > Producer
> > > > >> > > > erroneously believes the messages didn't get in, and might
> > > re-send
> > > > >> > them.
> > > > >> > > >
> > > > >> > > > You guys *haven't* solved that issue, right? I believe you
> > write
> > > > >> about
> > > > >> > > it on
> > > > >> > > > the Kafka site.
> > > > >> > > >
> > > > >> > > >>
> > > > >> > > >> Thanks,
> > > > >> > > >> Neha
> > > > >> > > >>
> > > > >> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <
> > > tombrown52@gmail.com
> > > > >
> > > > >> > > wrote:
> > > > >> > > >> > Is there an accepted, or recommended way to make writes
> to
> > a
> > > > Kafka
> > > > >> > > >> > queue idempotent, or within a transaction?
> > > > >> > > >> >
> > > > >> > > >> > I can configure my system such that each queue has
> exactly
> > > one
> > > > >> > > producer.
> > > > >> > > >> >
> > > > >> > > >> > (If there are no accepted/recommended ways, I have a few
> > > ideas
> > > > I
> > > > >> > would
> > > > >> > > >> > like to propose. I would also be willing to implement
> them
> > if
> > > > >> > needed)
> > > > >> > > >> >
> > > > >> > > >> > Thanks in advance!
> > > > >> > > >> >
> > > > >> > > >> > --Tom
> > > > >> > > >
> > > > >> > > > --
> > > > >> > > > Philip O'Toole
> > > > >> > > >
> > > > >> > > > Senior Developer
> > > > >> > > > Loggly, Inc.
> > > > >> > > > San Francisco, Calif.
> > > > >> > > > www.loggly.com
> > > > >> > > >
> > > > >> > > > Come join us!
> > > > >> > > > http://loggly.com/company/careers/
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
>

Re: Transactional writing

Posted by Jonathan Hodges <ho...@gmail.com>.
I know this is a really old thread, but it looked like the only pertinent
one that came up when searching for ‘exactly once’ in the archives.  I just
want to confirm my understanding of the 0.8 version in that it still
doesn’t completely support exactly once semantics.  With the producer
configured in sync mode and quorum commits there are still some edge case
failure modes where the producer won’t receive the ack and resends the
message(s).  I think I read that the consumers don’t see uncommitted
messages in the log, but I don’t think that addresses this producer case.
Please correct me if I am missing something here.


Don’t get me wrong we are very thankful for the 0.8 features.  It offers by
far the best message delivery guarantees out of the products we evaluated
like Rabbit and ActiveMQ.  We attempt to make are downstream consumer
processes idempotent to mitigate this edge case, but it isn’t always
feasible.  Also the suggestion by Milind in this thread of using Storm for
exactly once guarantees makes a lot of sense.  Trident State seems to
address this very issue (
https://github.com/nathanmarz/storm/wiki/Trident-state) so we could just
have it mediate our topics that required exactly once.


-Jonathan



On Sat, Nov 3, 2012 at 1:53 PM, Milind Parikh <mi...@gmail.com>wrote:

> Why wouldn't the storm approach provide semantics of exactly once
> delivery? https://github.com/nathanmarz/storm
>
> Nathan actually credits the Kafka_devs for the basic idea of transaction
> persisting in one of his talks.
>
> Regards
> Milind
>
> On Nov 3, 2012 11:51 AM, "Rohit Prasad" <ro...@gmail.com> wrote:
>
> > I agree that this approach only prevents duplicate messages to partition
> > from the Producer side. There needs to be a similar approach on the
> > consumer side too. Using Zk can be one solution, or other non-ZK
> > approaches.
> >
> > Even if Consumer reads none or all messages of a transaction. But that
> does
> > not solve the transaction problem yet. Because the business/application
> > logic inside the Consumer thread may execute partially and fail. So it
> > becomes tricky to decide the point when you want to say that you have
> > "consumed" the message and increase consumption offset. If your consumer
> > thread is saving some value  into DB/HDFS/etc, ideally you want this save
> > operation and consumption offset to be incremented atomically. Thats why
> it
> > boils down to Application logic implementing transactions and dealing
> with
> > duplicates.
> > Maybe a journalling or redo log approach on Consumer side can help build
> > such a system.
> >
> > It will be nice if eventually kafka can be a transport which provides
> > "exactly once" semantics for message delivery. Then consumer threads can
> be
> > sure that they receive messages once, and can build appln logic on top of
> > that.
> >
> > I have a use case similar to what Jay mentioned in a previous mail. I
> want
> > to do aggregation but want the aggregated data to be correct, possible
> > avoiding duplicates incase of failures/crashes.
> >
> >
> >
> > On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <to...@gmail.com> wrote:
> >
> > > That approach allows a producer to prevent duplicate messages to the
> > > partition, but what about the consumer? In my case, I don't want the
> > > consumer to be able to read any of the messages unless it can read all
> > > of the messages from a transaction.
> > >
> > > I also like the idea of there being multiple types of Kafka
> > > transaction, though, just to accommodate different performance,
> > > reliability, and consumption patterns. Of course, the added complexity
> > > of that might just sink the whole thing.
> > >
> > > --Tom
> > >
> > > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <rohit.prasad15@gmail.com
> >
> > > wrote:
> > > > Getting transactional support is quite hard problem. There will
> always
> > be
> > > > corner cases where the solution will not work, unless you want to go
> > down
> > > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's
> > > > performance. It is best to reconcile data and deal with duplicate
> > > messages
> > > > in Application layer. Having said that it would be amazing if we can
> > > build
> > > > "at most once" semantics in Kafka!!
> > > >
> > > > Regarding above approaches,
> > > > The producer will always have a doubt if its commit went through.
> i.e.
> > if
> > > > the ack for "commit" is not received by the producer. Or If producer
> > dies
> > > > immediately after calling the commit. When it is restarted how does
> it
> > > know
> > > > if last operation went through?
> > > >
> > > > I suggest the following -
> > > > 1. Producer should attach a timestamp at the beginning of each
> message
> > > and
> > > > send it to Server.
> > > > 2. On restarts/timeouts/re-connections, the producer should first
> read
> > > the
> > > > last committed message from the leader of the partition.
> > > > 3. From timestamp, it can know how many messages went through before
> it
> > > > died (or connection was broken). And it can infer how many messages
> to
> > > > replay.
> > > >
> > > > The above approach can be used with existing Kafka libraries since
> you
> > > can
> > > > have a producer and consumer thread together in an application to
> > > implement
> > > > this logic. Or someone can take the initiative to write a
> Transactional
> > > > producer (which internally has both producer and a consumer to read
> > last
> > > > committed message.) I will be developing one for kafka 0.8 in c++.
> > > >
> > > > The above approach will work even if you batch messages for a single
> > > > partition.
> > > > The above approach will work only if a single producer is writing to
> a
> > > > partition. I want hear opinions about the above approach. I sure
> there
> > > can
> > > > be corner-cases where it may break.
> > > >
> > > > If there are multiple producers to a partition, then some book
> keeping
> > on
> > > > server side with regards to last msg committed from a "co-relation
> id"
> > > (to
> > > > identify unique producer) may be needed.
> > > >
> > > >
> > > > Regards,
> > > > Rohit
> > > >
> > > >
> > > > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > >> If you use Kafka just as a redo log, you can't undo anything that's
> > > written
> > > >> to the log. Write-ahead logs in typical database systems are both
> redo
> > > and
> > > >> undo logs. Transaction commits and rollbacks are implemented on top
> of
> > > the
> > > >> logs. However, general-purpose write-ahead logs for transactions are
> > > much
> > > >> more complicated.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > >>
> > > >> > This is an important feature and I am interested in helping out in
> > the
> > > >> > design and implementation, though I am working on 0.8 features for
> > the
> > > >> next
> > > >> > month so I may not be of too much use. I have thought a little bit
> > > about
> > > >> > this, but I am not yet sure of the best approach.
> > > >> >
> > > >> > Here is a specific use case I think is important to address:
> > consider
> > > a
> > > >> > case where you are doing processing of one or more streams and
> > > producing
> > > >> an
> > > >> > output stream. This processing may involve some kind of local
> state
> > > (say
> > > >> > counters or other local aggregation intermediate state). This is a
> > > common
> > > >> > scenario. The problem is to give reasonable semantics to this
> > > computation
> > > >> > in the presence of failures. The processor effectively has a
> > > >> > position/offset in each of its input streams as well as whatever
> > local
> > > >> > state. The problem is that if this process fails it needs to
> restore
> > > to a
> > > >> > state that matches the last produced messages. There are several
> > > >> solutions
> > > >> > to this problem. One is to make the output somehow idempotent,
> this
> > > will
> > > >> > solve some cases but is not a general solution as many things
> cannot
> > > be
> > > >> > made idempotent easily.
> > > >> >
> > > >> > I think the two proposals you give outline a couple of basic
> > > approaches:
> > > >> > 1. Store the messages on the server somewhere but don't add them
> to
> > > the
> > > >> log
> > > >> > until the commit call
> > > >> > 2. Store the messages in the log but don't make them available to
> > the
> > > >> > consumer until the commit call
> > > >> > Another option you didn't mention:
> > > >> >
> > > >> > I can give several subtleties to these approaches.
> > > >> >
> > > >> > One advantage of the second approach is that messages are in the
> log
> > > and
> > > >> > can be available for reading or not. This makes it possible to
> > > support a
> > > >> > kind of "dirty read" that allows the consumer to specify whether
> > they
> > > >> want
> > > >> > to immediately see all messages with low latency but potentially
> see
> > > >> > uncommitted messages or only see committed messages.
> > > >> >
> > > >> > The problem with the second approach at least in the way you
> > describe
> > > it
> > > >> is
> > > >> > that you have to lock the log until the commit occurs otherwise
> you
> > > can't
> > > >> > roll back (because otherwise someone else may have appended their
> > own
> > > >> > messages and you can't truncate the log). This would have all the
> > > >> problems
> > > >> > of remote locks. I think this might be a deal-breaker.
> > > >> >
> > > >> > Another variation on the second approach would be the following:
> > have
> > > >> each
> > > >> > producer maintain an id and generation number. Keep a schedule of
> > > valid
> > > >> > offset/id/generation numbers on the broker and only hand these
> out.
> > > This
> > > >> > solution would support non-blocking multi-writer appends but
> > requires
> > > >> more
> > > >> > participation from the producer (i.e. getting a generation number
> > and
> > > >> id).
> > > >> >
> > > >> > Cheers,
> > > >> >
> > > >> > -Jay
> > > >> >
> > > >> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <to...@gmail.com>
> > > wrote:
> > > >> >
> > > >> > > I have come up with two different possibilities, both with
> > different
> > > >> > > trade-offs.
> > > >> > >
> > > >> > > The first would be to support "true" transactions by writing
> > > >> > > transactional data into a temporary file and then copy it
> directly
> > > to
> > > >> > > the end of the partition when the commit command is created. The
> > > >> > > upside to this approach is that individual transactions can be
> > > larger
> > > >> > > than a single batch, and more than one producer could conduct
> > > >> > > transactions at once. The downside is the extra IO involved in
> > > writing
> > > >> > > it and reading it from disk an extra time.
> > > >> > >
> > > >> > > The second would be to allow any number of messages to be
> appended
> > > to
> > > >> > > a topic, but not move the "end of topic" offset until the commit
> > was
> > > >> > > received. If a rollback was received, or the producer timed out,
> > the
> > > >> > > partition could be truncated at the most recently recognized
> "end
> > of
> > > >> > > topic" offset. The upside is that there is very little extra IO
> > > (only
> > > >> > > to store the official "end of topic" metadata), and it seems
> like
> > it
> > > >> > > should be easy to implement. The downside is that this the
> > > >> > > "transaction" feature is incompatible with anything but a single
> > > >> > > producer per partition.
> > > >> > >
> > > >> > > I am interested in your thoughts on these.
> > > >> > >
> > > >> > > --Tom
> > > >> > >
> > > >> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <
> > philip@loggly.com>
> > > >> > wrote:
> > > >> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> > > >> > > >> The closest concept of transaction on the publisher side,
> that
> > I
> > > can
> > > >> > > >> think of, is using batch of messages in a single call to the
> > > >> > > >> synchronous producer.
> > > >> > > >>
> > > >> > > >> Precisely, you can configure a Kafka producer to use the
> "sync"
> > > mode
> > > >> > > >> and batch messages that require transactional guarantees in a
> > > >> > > >> single send() call. That will ensure that either all the
> > > messages in
> > > >> > > >> the batch are sent or none.
> > > >> > > >
> > > >> > > > This is an interesting feature -- something I wasn't aware of.
> > > Still
> > > >> it
> > > >> > > > doesn't solve the problem *completely*. As many people
> realise,
> > > it's
> > > >> > > still
> > > >> > > > possible for the batch of messages to get into Kafka fine, but
> > the
> > > >> ack
> > > >> > > from
> > > >> > > > Kafka to be lost on its way back to the Producer. In that case
> > the
> > > >> > > Producer
> > > >> > > > erroneously believes the messages didn't get in, and might
> > re-send
> > > >> > them.
> > > >> > > >
> > > >> > > > You guys *haven't* solved that issue, right? I believe you
> write
> > > >> about
> > > >> > > it on
> > > >> > > > the Kafka site.
> > > >> > > >
> > > >> > > >>
> > > >> > > >> Thanks,
> > > >> > > >> Neha
> > > >> > > >>
> > > >> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <
> > tombrown52@gmail.com
> > > >
> > > >> > > wrote:
> > > >> > > >> > Is there an accepted, or recommended way to make writes to
> a
> > > Kafka
> > > >> > > >> > queue idempotent, or within a transaction?
> > > >> > > >> >
> > > >> > > >> > I can configure my system such that each queue has exactly
> > one
> > > >> > > producer.
> > > >> > > >> >
> > > >> > > >> > (If there are no accepted/recommended ways, I have a few
> > ideas
> > > I
> > > >> > would
> > > >> > > >> > like to propose. I would also be willing to implement them
> if
> > > >> > needed)
> > > >> > > >> >
> > > >> > > >> > Thanks in advance!
> > > >> > > >> >
> > > >> > > >> > --Tom
> > > >> > > >
> > > >> > > > --
> > > >> > > > Philip O'Toole
> > > >> > > >
> > > >> > > > Senior Developer
> > > >> > > > Loggly, Inc.
> > > >> > > > San Francisco, Calif.
> > > >> > > > www.loggly.com
> > > >> > > >
> > > >> > > > Come join us!
> > > >> > > > http://loggly.com/company/careers/
> > > >> > >
> > > >> >
> > > >>
> > >
> >
>

Re: Transactional writing

Posted by Milind Parikh <mi...@gmail.com>.
Why wouldn't the storm approach provide semantics of exactly once
delivery? https://github.com/nathanmarz/storm

Nathan actually credits the Kafka_devs for the basic idea of transaction
persisting in one of his talks.

Regards
Milind

On Nov 3, 2012 11:51 AM, "Rohit Prasad" <ro...@gmail.com> wrote:

> I agree that this approach only prevents duplicate messages to partition
> from the Producer side. There needs to be a similar approach on the
> consumer side too. Using Zk can be one solution, or other non-ZK
> approaches.
>
> Even if Consumer reads none or all messages of a transaction. But that does
> not solve the transaction problem yet. Because the business/application
> logic inside the Consumer thread may execute partially and fail. So it
> becomes tricky to decide the point when you want to say that you have
> "consumed" the message and increase consumption offset. If your consumer
> thread is saving some value  into DB/HDFS/etc, ideally you want this save
> operation and consumption offset to be incremented atomically. Thats why it
> boils down to Application logic implementing transactions and dealing with
> duplicates.
> Maybe a journalling or redo log approach on Consumer side can help build
> such a system.
>
> It will be nice if eventually kafka can be a transport which provides
> "exactly once" semantics for message delivery. Then consumer threads can be
> sure that they receive messages once, and can build appln logic on top of
> that.
>
> I have a use case similar to what Jay mentioned in a previous mail. I want
> to do aggregation but want the aggregated data to be correct, possible
> avoiding duplicates incase of failures/crashes.
>
>
>
> On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <to...@gmail.com> wrote:
>
> > That approach allows a producer to prevent duplicate messages to the
> > partition, but what about the consumer? In my case, I don't want the
> > consumer to be able to read any of the messages unless it can read all
> > of the messages from a transaction.
> >
> > I also like the idea of there being multiple types of Kafka
> > transaction, though, just to accommodate different performance,
> > reliability, and consumption patterns. Of course, the added complexity
> > of that might just sink the whole thing.
> >
> > --Tom
> >
> > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <ro...@gmail.com>
> > wrote:
> > > Getting transactional support is quite hard problem. There will always
> be
> > > corner cases where the solution will not work, unless you want to go
> down
> > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's
> > > performance. It is best to reconcile data and deal with duplicate
> > messages
> > > in Application layer. Having said that it would be amazing if we can
> > build
> > > "at most once" semantics in Kafka!!
> > >
> > > Regarding above approaches,
> > > The producer will always have a doubt if its commit went through. i.e.
> if
> > > the ack for "commit" is not received by the producer. Or If producer
> dies
> > > immediately after calling the commit. When it is restarted how does it
> > know
> > > if last operation went through?
> > >
> > > I suggest the following -
> > > 1. Producer should attach a timestamp at the beginning of each message
> > and
> > > send it to Server.
> > > 2. On restarts/timeouts/re-connections, the producer should first read
> > the
> > > last committed message from the leader of the partition.
> > > 3. From timestamp, it can know how many messages went through before it
> > > died (or connection was broken). And it can infer how many messages to
> > > replay.
> > >
> > > The above approach can be used with existing Kafka libraries since you
> > can
> > > have a producer and consumer thread together in an application to
> > implement
> > > this logic. Or someone can take the initiative to write a Transactional
> > > producer (which internally has both producer and a consumer to read
> last
> > > committed message.) I will be developing one for kafka 0.8 in c++.
> > >
> > > The above approach will work even if you batch messages for a single
> > > partition.
> > > The above approach will work only if a single producer is writing to a
> > > partition. I want hear opinions about the above approach. I sure there
> > can
> > > be corner-cases where it may break.
> > >
> > > If there are multiple producers to a partition, then some book keeping
> on
> > > server side with regards to last msg committed from a "co-relation id"
> > (to
> > > identify unique producer) may be needed.
> > >
> > >
> > > Regards,
> > > Rohit
> > >
> > >
> > > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > >> If you use Kafka just as a redo log, you can't undo anything that's
> > written
> > >> to the log. Write-ahead logs in typical database systems are both redo
> > and
> > >> undo logs. Transaction commits and rollbacks are implemented on top of
> > the
> > >> logs. However, general-purpose write-ahead logs for transactions are
> > much
> > >> more complicated.
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > >>
> > >> > This is an important feature and I am interested in helping out in
> the
> > >> > design and implementation, though I am working on 0.8 features for
> the
> > >> next
> > >> > month so I may not be of too much use. I have thought a little bit
> > about
> > >> > this, but I am not yet sure of the best approach.
> > >> >
> > >> > Here is a specific use case I think is important to address:
> consider
> > a
> > >> > case where you are doing processing of one or more streams and
> > producing
> > >> an
> > >> > output stream. This processing may involve some kind of local state
> > (say
> > >> > counters or other local aggregation intermediate state). This is a
> > common
> > >> > scenario. The problem is to give reasonable semantics to this
> > computation
> > >> > in the presence of failures. The processor effectively has a
> > >> > position/offset in each of its input streams as well as whatever
> local
> > >> > state. The problem is that if this process fails it needs to restore
> > to a
> > >> > state that matches the last produced messages. There are several
> > >> solutions
> > >> > to this problem. One is to make the output somehow idempotent, this
> > will
> > >> > solve some cases but is not a general solution as many things cannot
> > be
> > >> > made idempotent easily.
> > >> >
> > >> > I think the two proposals you give outline a couple of basic
> > approaches:
> > >> > 1. Store the messages on the server somewhere but don't add them to
> > the
> > >> log
> > >> > until the commit call
> > >> > 2. Store the messages in the log but don't make them available to
> the
> > >> > consumer until the commit call
> > >> > Another option you didn't mention:
> > >> >
> > >> > I can give several subtleties to these approaches.
> > >> >
> > >> > One advantage of the second approach is that messages are in the log
> > and
> > >> > can be available for reading or not. This makes it possible to
> > support a
> > >> > kind of "dirty read" that allows the consumer to specify whether
> they
> > >> want
> > >> > to immediately see all messages with low latency but potentially see
> > >> > uncommitted messages or only see committed messages.
> > >> >
> > >> > The problem with the second approach at least in the way you
> describe
> > it
> > >> is
> > >> > that you have to lock the log until the commit occurs otherwise you
> > can't
> > >> > roll back (because otherwise someone else may have appended their
> own
> > >> > messages and you can't truncate the log). This would have all the
> > >> problems
> > >> > of remote locks. I think this might be a deal-breaker.
> > >> >
> > >> > Another variation on the second approach would be the following:
> have
> > >> each
> > >> > producer maintain an id and generation number. Keep a schedule of
> > valid
> > >> > offset/id/generation numbers on the broker and only hand these out.
> > This
> > >> > solution would support non-blocking multi-writer appends but
> requires
> > >> more
> > >> > participation from the producer (i.e. getting a generation number
> and
> > >> id).
> > >> >
> > >> > Cheers,
> > >> >
> > >> > -Jay
> > >> >
> > >> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <to...@gmail.com>
> > wrote:
> > >> >
> > >> > > I have come up with two different possibilities, both with
> different
> > >> > > trade-offs.
> > >> > >
> > >> > > The first would be to support "true" transactions by writing
> > >> > > transactional data into a temporary file and then copy it directly
> > to
> > >> > > the end of the partition when the commit command is created. The
> > >> > > upside to this approach is that individual transactions can be
> > larger
> > >> > > than a single batch, and more than one producer could conduct
> > >> > > transactions at once. The downside is the extra IO involved in
> > writing
> > >> > > it and reading it from disk an extra time.
> > >> > >
> > >> > > The second would be to allow any number of messages to be appended
> > to
> > >> > > a topic, but not move the "end of topic" offset until the commit
> was
> > >> > > received. If a rollback was received, or the producer timed out,
> the
> > >> > > partition could be truncated at the most recently recognized "end
> of
> > >> > > topic" offset. The upside is that there is very little extra IO
> > (only
> > >> > > to store the official "end of topic" metadata), and it seems like
> it
> > >> > > should be easy to implement. The downside is that this the
> > >> > > "transaction" feature is incompatible with anything but a single
> > >> > > producer per partition.
> > >> > >
> > >> > > I am interested in your thoughts on these.
> > >> > >
> > >> > > --Tom
> > >> > >
> > >> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <
> philip@loggly.com>
> > >> > wrote:
> > >> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> > >> > > >> The closest concept of transaction on the publisher side, that
> I
> > can
> > >> > > >> think of, is using batch of messages in a single call to the
> > >> > > >> synchronous producer.
> > >> > > >>
> > >> > > >> Precisely, you can configure a Kafka producer to use the "sync"
> > mode
> > >> > > >> and batch messages that require transactional guarantees in a
> > >> > > >> single send() call. That will ensure that either all the
> > messages in
> > >> > > >> the batch are sent or none.
> > >> > > >
> > >> > > > This is an interesting feature -- something I wasn't aware of.
> > Still
> > >> it
> > >> > > > doesn't solve the problem *completely*. As many people realise,
> > it's
> > >> > > still
> > >> > > > possible for the batch of messages to get into Kafka fine, but
> the
> > >> ack
> > >> > > from
> > >> > > > Kafka to be lost on its way back to the Producer. In that case
> the
> > >> > > Producer
> > >> > > > erroneously believes the messages didn't get in, and might
> re-send
> > >> > them.
> > >> > > >
> > >> > > > You guys *haven't* solved that issue, right? I believe you write
> > >> about
> > >> > > it on
> > >> > > > the Kafka site.
> > >> > > >
> > >> > > >>
> > >> > > >> Thanks,
> > >> > > >> Neha
> > >> > > >>
> > >> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <
> tombrown52@gmail.com
> > >
> > >> > > wrote:
> > >> > > >> > Is there an accepted, or recommended way to make writes to a
> > Kafka
> > >> > > >> > queue idempotent, or within a transaction?
> > >> > > >> >
> > >> > > >> > I can configure my system such that each queue has exactly
> one
> > >> > > producer.
> > >> > > >> >
> > >> > > >> > (If there are no accepted/recommended ways, I have a few
> ideas
> > I
> > >> > would
> > >> > > >> > like to propose. I would also be willing to implement them if
> > >> > needed)
> > >> > > >> >
> > >> > > >> > Thanks in advance!
> > >> > > >> >
> > >> > > >> > --Tom
> > >> > > >
> > >> > > > --
> > >> > > > Philip O'Toole
> > >> > > >
> > >> > > > Senior Developer
> > >> > > > Loggly, Inc.
> > >> > > > San Francisco, Calif.
> > >> > > > www.loggly.com
> > >> > > >
> > >> > > > Come join us!
> > >> > > > http://loggly.com/company/careers/
> > >> > >
> > >> >
> > >>
> >
>

Re: Transactional writing

Posted by Rohit Prasad <ro...@gmail.com>.
I agree that this approach only prevents duplicate messages to partition
from the Producer side. There needs to be a similar approach on the
consumer side too. Using Zk can be one solution, or other non-ZK approaches.

Even if Consumer reads none or all messages of a transaction. But that does
not solve the transaction problem yet. Because the business/application
logic inside the Consumer thread may execute partially and fail. So it
becomes tricky to decide the point when you want to say that you have
"consumed" the message and increase consumption offset. If your consumer
thread is saving some value  into DB/HDFS/etc, ideally you want this save
operation and consumption offset to be incremented atomically. Thats why it
boils down to Application logic implementing transactions and dealing with
duplicates.
Maybe a journalling or redo log approach on Consumer side can help build
such a system.

It will be nice if eventually kafka can be a transport which provides
"exactly once" semantics for message delivery. Then consumer threads can be
sure that they receive messages once, and can build appln logic on top of
that.

I have a use case similar to what Jay mentioned in a previous mail. I want
to do aggregation but want the aggregated data to be correct, possible
avoiding duplicates incase of failures/crashes.



On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <to...@gmail.com> wrote:

> That approach allows a producer to prevent duplicate messages to the
> partition, but what about the consumer? In my case, I don't want the
> consumer to be able to read any of the messages unless it can read all
> of the messages from a transaction.
>
> I also like the idea of there being multiple types of Kafka
> transaction, though, just to accommodate different performance,
> reliability, and consumption patterns. Of course, the added complexity
> of that might just sink the whole thing.
>
> --Tom
>
> On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <ro...@gmail.com>
> wrote:
> > Getting transactional support is quite hard problem. There will always be
> > corner cases where the solution will not work, unless you want to go down
> > the path of 2PC, paxos, etc which ofcourse will degrade kafka's
> > performance. It is best to reconcile data and deal with duplicate
> messages
> > in Application layer. Having said that it would be amazing if we can
> build
> > "at most once" semantics in Kafka!!
> >
> > Regarding above approaches,
> > The producer will always have a doubt if its commit went through. i.e. if
> > the ack for "commit" is not received by the producer. Or If producer dies
> > immediately after calling the commit. When it is restarted how does it
> know
> > if last operation went through?
> >
> > I suggest the following -
> > 1. Producer should attach a timestamp at the beginning of each message
> and
> > send it to Server.
> > 2. On restarts/timeouts/re-connections, the producer should first read
> the
> > last committed message from the leader of the partition.
> > 3. From timestamp, it can know how many messages went through before it
> > died (or connection was broken). And it can infer how many messages to
> > replay.
> >
> > The above approach can be used with existing Kafka libraries since you
> can
> > have a producer and consumer thread together in an application to
> implement
> > this logic. Or someone can take the initiative to write a Transactional
> > producer (which internally has both producer and a consumer to read last
> > committed message.) I will be developing one for kafka 0.8 in c++.
> >
> > The above approach will work even if you batch messages for a single
> > partition.
> > The above approach will work only if a single producer is writing to a
> > partition. I want hear opinions about the above approach. I sure there
> can
> > be corner-cases where it may break.
> >
> > If there are multiple producers to a partition, then some book keeping on
> > server side with regards to last msg committed from a "co-relation id"
> (to
> > identify unique producer) may be needed.
> >
> >
> > Regards,
> > Rohit
> >
> >
> > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> If you use Kafka just as a redo log, you can't undo anything that's
> written
> >> to the log. Write-ahead logs in typical database systems are both redo
> and
> >> undo logs. Transaction commits and rollbacks are implemented on top of
> the
> >> logs. However, general-purpose write-ahead logs for transactions are
> much
> >> more complicated.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <ja...@gmail.com>
> wrote:
> >>
> >> > This is an important feature and I am interested in helping out in the
> >> > design and implementation, though I am working on 0.8 features for the
> >> next
> >> > month so I may not be of too much use. I have thought a little bit
> about
> >> > this, but I am not yet sure of the best approach.
> >> >
> >> > Here is a specific use case I think is important to address: consider
> a
> >> > case where you are doing processing of one or more streams and
> producing
> >> an
> >> > output stream. This processing may involve some kind of local state
> (say
> >> > counters or other local aggregation intermediate state). This is a
> common
> >> > scenario. The problem is to give reasonable semantics to this
> computation
> >> > in the presence of failures. The processor effectively has a
> >> > position/offset in each of its input streams as well as whatever local
> >> > state. The problem is that if this process fails it needs to restore
> to a
> >> > state that matches the last produced messages. There are several
> >> solutions
> >> > to this problem. One is to make the output somehow idempotent, this
> will
> >> > solve some cases but is not a general solution as many things cannot
> be
> >> > made idempotent easily.
> >> >
> >> > I think the two proposals you give outline a couple of basic
> approaches:
> >> > 1. Store the messages on the server somewhere but don't add them to
> the
> >> log
> >> > until the commit call
> >> > 2. Store the messages in the log but don't make them available to the
> >> > consumer until the commit call
> >> > Another option you didn't mention:
> >> >
> >> > I can give several subtleties to these approaches.
> >> >
> >> > One advantage of the second approach is that messages are in the log
> and
> >> > can be available for reading or not. This makes it possible to
> support a
> >> > kind of "dirty read" that allows the consumer to specify whether they
> >> want
> >> > to immediately see all messages with low latency but potentially see
> >> > uncommitted messages or only see committed messages.
> >> >
> >> > The problem with the second approach at least in the way you describe
> it
> >> is
> >> > that you have to lock the log until the commit occurs otherwise you
> can't
> >> > roll back (because otherwise someone else may have appended their own
> >> > messages and you can't truncate the log). This would have all the
> >> problems
> >> > of remote locks. I think this might be a deal-breaker.
> >> >
> >> > Another variation on the second approach would be the following: have
> >> each
> >> > producer maintain an id and generation number. Keep a schedule of
> valid
> >> > offset/id/generation numbers on the broker and only hand these out.
> This
> >> > solution would support non-blocking multi-writer appends but requires
> >> more
> >> > participation from the producer (i.e. getting a generation number and
> >> id).
> >> >
> >> > Cheers,
> >> >
> >> > -Jay
> >> >
> >> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <to...@gmail.com>
> wrote:
> >> >
> >> > > I have come up with two different possibilities, both with different
> >> > > trade-offs.
> >> > >
> >> > > The first would be to support "true" transactions by writing
> >> > > transactional data into a temporary file and then copy it directly
> to
> >> > > the end of the partition when the commit command is created. The
> >> > > upside to this approach is that individual transactions can be
> larger
> >> > > than a single batch, and more than one producer could conduct
> >> > > transactions at once. The downside is the extra IO involved in
> writing
> >> > > it and reading it from disk an extra time.
> >> > >
> >> > > The second would be to allow any number of messages to be appended
> to
> >> > > a topic, but not move the "end of topic" offset until the commit was
> >> > > received. If a rollback was received, or the producer timed out, the
> >> > > partition could be truncated at the most recently recognized "end of
> >> > > topic" offset. The upside is that there is very little extra IO
> (only
> >> > > to store the official "end of topic" metadata), and it seems like it
> >> > > should be easy to implement. The downside is that this the
> >> > > "transaction" feature is incompatible with anything but a single
> >> > > producer per partition.
> >> > >
> >> > > I am interested in your thoughts on these.
> >> > >
> >> > > --Tom
> >> > >
> >> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <ph...@loggly.com>
> >> > wrote:
> >> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> >> > > >> The closest concept of transaction on the publisher side, that I
> can
> >> > > >> think of, is using batch of messages in a single call to the
> >> > > >> synchronous producer.
> >> > > >>
> >> > > >> Precisely, you can configure a Kafka producer to use the "sync"
> mode
> >> > > >> and batch messages that require transactional guarantees in a
> >> > > >> single send() call. That will ensure that either all the
> messages in
> >> > > >> the batch are sent or none.
> >> > > >
> >> > > > This is an interesting feature -- something I wasn't aware of.
> Still
> >> it
> >> > > > doesn't solve the problem *completely*. As many people realise,
> it's
> >> > > still
> >> > > > possible for the batch of messages to get into Kafka fine, but the
> >> ack
> >> > > from
> >> > > > Kafka to be lost on its way back to the Producer. In that case the
> >> > > Producer
> >> > > > erroneously believes the messages didn't get in, and might re-send
> >> > them.
> >> > > >
> >> > > > You guys *haven't* solved that issue, right? I believe you write
> >> about
> >> > > it on
> >> > > > the Kafka site.
> >> > > >
> >> > > >>
> >> > > >> Thanks,
> >> > > >> Neha
> >> > > >>
> >> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <tombrown52@gmail.com
> >
> >> > > wrote:
> >> > > >> > Is there an accepted, or recommended way to make writes to a
> Kafka
> >> > > >> > queue idempotent, or within a transaction?
> >> > > >> >
> >> > > >> > I can configure my system such that each queue has exactly one
> >> > > producer.
> >> > > >> >
> >> > > >> > (If there are no accepted/recommended ways, I have a few ideas
> I
> >> > would
> >> > > >> > like to propose. I would also be willing to implement them if
> >> > needed)
> >> > > >> >
> >> > > >> > Thanks in advance!
> >> > > >> >
> >> > > >> > --Tom
> >> > > >
> >> > > > --
> >> > > > Philip O'Toole
> >> > > >
> >> > > > Senior Developer
> >> > > > Loggly, Inc.
> >> > > > San Francisco, Calif.
> >> > > > www.loggly.com
> >> > > >
> >> > > > Come join us!
> >> > > > http://loggly.com/company/careers/
> >> > >
> >> >
> >>
>

Re: Transactional writing

Posted by Tom Brown <to...@gmail.com>.
That approach allows a producer to prevent duplicate messages to the
partition, but what about the consumer? In my case, I don't want the
consumer to be able to read any of the messages unless it can read all
of the messages from a transaction.

I also like the idea of there being multiple types of Kafka
transaction, though, just to accommodate different performance,
reliability, and consumption patterns. Of course, the added complexity
of that might just sink the whole thing.

--Tom

On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <ro...@gmail.com> wrote:
> Getting transactional support is quite hard problem. There will always be
> corner cases where the solution will not work, unless you want to go down
> the path of 2PC, paxos, etc which ofcourse will degrade kafka's
> performance. It is best to reconcile data and deal with duplicate messages
> in Application layer. Having said that it would be amazing if we can build
> "at most once" semantics in Kafka!!
>
> Regarding above approaches,
> The producer will always have a doubt if its commit went through. i.e. if
> the ack for "commit" is not received by the producer. Or If producer dies
> immediately after calling the commit. When it is restarted how does it know
> if last operation went through?
>
> I suggest the following -
> 1. Producer should attach a timestamp at the beginning of each message and
> send it to Server.
> 2. On restarts/timeouts/re-connections, the producer should first read the
> last committed message from the leader of the partition.
> 3. From timestamp, it can know how many messages went through before it
> died (or connection was broken). And it can infer how many messages to
> replay.
>
> The above approach can be used with existing Kafka libraries since you can
> have a producer and consumer thread together in an application to implement
> this logic. Or someone can take the initiative to write a Transactional
> producer (which internally has both producer and a consumer to read last
> committed message.) I will be developing one for kafka 0.8 in c++.
>
> The above approach will work even if you batch messages for a single
> partition.
> The above approach will work only if a single producer is writing to a
> partition. I want hear opinions about the above approach. I sure there can
> be corner-cases where it may break.
>
> If there are multiple producers to a partition, then some book keeping on
> server side with regards to last msg committed from a "co-relation id" (to
> identify unique producer) may be needed.
>
>
> Regards,
> Rohit
>
>
> On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> If you use Kafka just as a redo log, you can't undo anything that's written
>> to the log. Write-ahead logs in typical database systems are both redo and
>> undo logs. Transaction commits and rollbacks are implemented on top of the
>> logs. However, general-purpose write-ahead logs for transactions are much
>> more complicated.
>>
>> Thanks,
>>
>> Jun
>>
>> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <ja...@gmail.com> wrote:
>>
>> > This is an important feature and I am interested in helping out in the
>> > design and implementation, though I am working on 0.8 features for the
>> next
>> > month so I may not be of too much use. I have thought a little bit about
>> > this, but I am not yet sure of the best approach.
>> >
>> > Here is a specific use case I think is important to address: consider a
>> > case where you are doing processing of one or more streams and producing
>> an
>> > output stream. This processing may involve some kind of local state (say
>> > counters or other local aggregation intermediate state). This is a common
>> > scenario. The problem is to give reasonable semantics to this computation
>> > in the presence of failures. The processor effectively has a
>> > position/offset in each of its input streams as well as whatever local
>> > state. The problem is that if this process fails it needs to restore to a
>> > state that matches the last produced messages. There are several
>> solutions
>> > to this problem. One is to make the output somehow idempotent, this will
>> > solve some cases but is not a general solution as many things cannot be
>> > made idempotent easily.
>> >
>> > I think the two proposals you give outline a couple of basic approaches:
>> > 1. Store the messages on the server somewhere but don't add them to the
>> log
>> > until the commit call
>> > 2. Store the messages in the log but don't make them available to the
>> > consumer until the commit call
>> > Another option you didn't mention:
>> >
>> > I can give several subtleties to these approaches.
>> >
>> > One advantage of the second approach is that messages are in the log and
>> > can be available for reading or not. This makes it possible to support a
>> > kind of "dirty read" that allows the consumer to specify whether they
>> want
>> > to immediately see all messages with low latency but potentially see
>> > uncommitted messages or only see committed messages.
>> >
>> > The problem with the second approach at least in the way you describe it
>> is
>> > that you have to lock the log until the commit occurs otherwise you can't
>> > roll back (because otherwise someone else may have appended their own
>> > messages and you can't truncate the log). This would have all the
>> problems
>> > of remote locks. I think this might be a deal-breaker.
>> >
>> > Another variation on the second approach would be the following: have
>> each
>> > producer maintain an id and generation number. Keep a schedule of valid
>> > offset/id/generation numbers on the broker and only hand these out. This
>> > solution would support non-blocking multi-writer appends but requires
>> more
>> > participation from the producer (i.e. getting a generation number and
>> id).
>> >
>> > Cheers,
>> >
>> > -Jay
>> >
>> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <to...@gmail.com> wrote:
>> >
>> > > I have come up with two different possibilities, both with different
>> > > trade-offs.
>> > >
>> > > The first would be to support "true" transactions by writing
>> > > transactional data into a temporary file and then copy it directly to
>> > > the end of the partition when the commit command is created. The
>> > > upside to this approach is that individual transactions can be larger
>> > > than a single batch, and more than one producer could conduct
>> > > transactions at once. The downside is the extra IO involved in writing
>> > > it and reading it from disk an extra time.
>> > >
>> > > The second would be to allow any number of messages to be appended to
>> > > a topic, but not move the "end of topic" offset until the commit was
>> > > received. If a rollback was received, or the producer timed out, the
>> > > partition could be truncated at the most recently recognized "end of
>> > > topic" offset. The upside is that there is very little extra IO (only
>> > > to store the official "end of topic" metadata), and it seems like it
>> > > should be easy to implement. The downside is that this the
>> > > "transaction" feature is incompatible with anything but a single
>> > > producer per partition.
>> > >
>> > > I am interested in your thoughts on these.
>> > >
>> > > --Tom
>> > >
>> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <ph...@loggly.com>
>> > wrote:
>> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
>> > > >> The closest concept of transaction on the publisher side, that I can
>> > > >> think of, is using batch of messages in a single call to the
>> > > >> synchronous producer.
>> > > >>
>> > > >> Precisely, you can configure a Kafka producer to use the "sync" mode
>> > > >> and batch messages that require transactional guarantees in a
>> > > >> single send() call. That will ensure that either all the messages in
>> > > >> the batch are sent or none.
>> > > >
>> > > > This is an interesting feature -- something I wasn't aware of. Still
>> it
>> > > > doesn't solve the problem *completely*. As many people realise, it's
>> > > still
>> > > > possible for the batch of messages to get into Kafka fine, but the
>> ack
>> > > from
>> > > > Kafka to be lost on its way back to the Producer. In that case the
>> > > Producer
>> > > > erroneously believes the messages didn't get in, and might re-send
>> > them.
>> > > >
>> > > > You guys *haven't* solved that issue, right? I believe you write
>> about
>> > > it on
>> > > > the Kafka site.
>> > > >
>> > > >>
>> > > >> Thanks,
>> > > >> Neha
>> > > >>
>> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
>> > > wrote:
>> > > >> > Is there an accepted, or recommended way to make writes to a Kafka
>> > > >> > queue idempotent, or within a transaction?
>> > > >> >
>> > > >> > I can configure my system such that each queue has exactly one
>> > > producer.
>> > > >> >
>> > > >> > (If there are no accepted/recommended ways, I have a few ideas I
>> > would
>> > > >> > like to propose. I would also be willing to implement them if
>> > needed)
>> > > >> >
>> > > >> > Thanks in advance!
>> > > >> >
>> > > >> > --Tom
>> > > >
>> > > > --
>> > > > Philip O'Toole
>> > > >
>> > > > Senior Developer
>> > > > Loggly, Inc.
>> > > > San Francisco, Calif.
>> > > > www.loggly.com
>> > > >
>> > > > Come join us!
>> > > > http://loggly.com/company/careers/
>> > >
>> >
>>

Re: Transactional writing

Posted by Rohit Prasad <ro...@gmail.com>.
Getting transactional support is quite hard problem. There will always be
corner cases where the solution will not work, unless you want to go down
the path of 2PC, paxos, etc which ofcourse will degrade kafka's
performance. It is best to reconcile data and deal with duplicate messages
in Application layer. Having said that it would be amazing if we can build
"at most once" semantics in Kafka!!

Regarding above approaches,
The producer will always have a doubt if its commit went through. i.e. if
the ack for "commit" is not received by the producer. Or If producer dies
immediately after calling the commit. When it is restarted how does it know
if last operation went through?

I suggest the following -
1. Producer should attach a timestamp at the beginning of each message and
send it to Server.
2. On restarts/timeouts/re-connections, the producer should first read the
last committed message from the leader of the partition.
3. From timestamp, it can know how many messages went through before it
died (or connection was broken). And it can infer how many messages to
replay.

The above approach can be used with existing Kafka libraries since you can
have a producer and consumer thread together in an application to implement
this logic. Or someone can take the initiative to write a Transactional
producer (which internally has both producer and a consumer to read last
committed message.) I will be developing one for kafka 0.8 in c++.

The above approach will work even if you batch messages for a single
partition.
The above approach will work only if a single producer is writing to a
partition. I want hear opinions about the above approach. I sure there can
be corner-cases where it may break.

If there are multiple producers to a partition, then some book keeping on
server side with regards to last msg committed from a "co-relation id" (to
identify unique producer) may be needed.


Regards,
Rohit


On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <ju...@gmail.com> wrote:

> If you use Kafka just as a redo log, you can't undo anything that's written
> to the log. Write-ahead logs in typical database systems are both redo and
> undo logs. Transaction commits and rollbacks are implemented on top of the
> logs. However, general-purpose write-ahead logs for transactions are much
> more complicated.
>
> Thanks,
>
> Jun
>
> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > This is an important feature and I am interested in helping out in the
> > design and implementation, though I am working on 0.8 features for the
> next
> > month so I may not be of too much use. I have thought a little bit about
> > this, but I am not yet sure of the best approach.
> >
> > Here is a specific use case I think is important to address: consider a
> > case where you are doing processing of one or more streams and producing
> an
> > output stream. This processing may involve some kind of local state (say
> > counters or other local aggregation intermediate state). This is a common
> > scenario. The problem is to give reasonable semantics to this computation
> > in the presence of failures. The processor effectively has a
> > position/offset in each of its input streams as well as whatever local
> > state. The problem is that if this process fails it needs to restore to a
> > state that matches the last produced messages. There are several
> solutions
> > to this problem. One is to make the output somehow idempotent, this will
> > solve some cases but is not a general solution as many things cannot be
> > made idempotent easily.
> >
> > I think the two proposals you give outline a couple of basic approaches:
> > 1. Store the messages on the server somewhere but don't add them to the
> log
> > until the commit call
> > 2. Store the messages in the log but don't make them available to the
> > consumer until the commit call
> > Another option you didn't mention:
> >
> > I can give several subtleties to these approaches.
> >
> > One advantage of the second approach is that messages are in the log and
> > can be available for reading or not. This makes it possible to support a
> > kind of "dirty read" that allows the consumer to specify whether they
> want
> > to immediately see all messages with low latency but potentially see
> > uncommitted messages or only see committed messages.
> >
> > The problem with the second approach at least in the way you describe it
> is
> > that you have to lock the log until the commit occurs otherwise you can't
> > roll back (because otherwise someone else may have appended their own
> > messages and you can't truncate the log). This would have all the
> problems
> > of remote locks. I think this might be a deal-breaker.
> >
> > Another variation on the second approach would be the following: have
> each
> > producer maintain an id and generation number. Keep a schedule of valid
> > offset/id/generation numbers on the broker and only hand these out. This
> > solution would support non-blocking multi-writer appends but requires
> more
> > participation from the producer (i.e. getting a generation number and
> id).
> >
> > Cheers,
> >
> > -Jay
> >
> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <to...@gmail.com> wrote:
> >
> > > I have come up with two different possibilities, both with different
> > > trade-offs.
> > >
> > > The first would be to support "true" transactions by writing
> > > transactional data into a temporary file and then copy it directly to
> > > the end of the partition when the commit command is created. The
> > > upside to this approach is that individual transactions can be larger
> > > than a single batch, and more than one producer could conduct
> > > transactions at once. The downside is the extra IO involved in writing
> > > it and reading it from disk an extra time.
> > >
> > > The second would be to allow any number of messages to be appended to
> > > a topic, but not move the "end of topic" offset until the commit was
> > > received. If a rollback was received, or the producer timed out, the
> > > partition could be truncated at the most recently recognized "end of
> > > topic" offset. The upside is that there is very little extra IO (only
> > > to store the official "end of topic" metadata), and it seems like it
> > > should be easy to implement. The downside is that this the
> > > "transaction" feature is incompatible with anything but a single
> > > producer per partition.
> > >
> > > I am interested in your thoughts on these.
> > >
> > > --Tom
> > >
> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <ph...@loggly.com>
> > wrote:
> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> > > >> The closest concept of transaction on the publisher side, that I can
> > > >> think of, is using batch of messages in a single call to the
> > > >> synchronous producer.
> > > >>
> > > >> Precisely, you can configure a Kafka producer to use the "sync" mode
> > > >> and batch messages that require transactional guarantees in a
> > > >> single send() call. That will ensure that either all the messages in
> > > >> the batch are sent or none.
> > > >
> > > > This is an interesting feature -- something I wasn't aware of. Still
> it
> > > > doesn't solve the problem *completely*. As many people realise, it's
> > > still
> > > > possible for the batch of messages to get into Kafka fine, but the
> ack
> > > from
> > > > Kafka to be lost on its way back to the Producer. In that case the
> > > Producer
> > > > erroneously believes the messages didn't get in, and might re-send
> > them.
> > > >
> > > > You guys *haven't* solved that issue, right? I believe you write
> about
> > > it on
> > > > the Kafka site.
> > > >
> > > >>
> > > >> Thanks,
> > > >> Neha
> > > >>
> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
> > > wrote:
> > > >> > Is there an accepted, or recommended way to make writes to a Kafka
> > > >> > queue idempotent, or within a transaction?
> > > >> >
> > > >> > I can configure my system such that each queue has exactly one
> > > producer.
> > > >> >
> > > >> > (If there are no accepted/recommended ways, I have a few ideas I
> > would
> > > >> > like to propose. I would also be willing to implement them if
> > needed)
> > > >> >
> > > >> > Thanks in advance!
> > > >> >
> > > >> > --Tom
> > > >
> > > > --
> > > > Philip O'Toole
> > > >
> > > > Senior Developer
> > > > Loggly, Inc.
> > > > San Francisco, Calif.
> > > > www.loggly.com
> > > >
> > > > Come join us!
> > > > http://loggly.com/company/careers/
> > >
> >
>

Re: Transactional writing

Posted by Jun Rao <ju...@gmail.com>.
If you use Kafka just as a redo log, you can't undo anything that's written
to the log. Write-ahead logs in typical database systems are both redo and
undo logs. Transaction commits and rollbacks are implemented on top of the
logs. However, general-purpose write-ahead logs for transactions are much
more complicated.

Thanks,

Jun

On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <ja...@gmail.com> wrote:

> This is an important feature and I am interested in helping out in the
> design and implementation, though I am working on 0.8 features for the next
> month so I may not be of too much use. I have thought a little bit about
> this, but I am not yet sure of the best approach.
>
> Here is a specific use case I think is important to address: consider a
> case where you are doing processing of one or more streams and producing an
> output stream. This processing may involve some kind of local state (say
> counters or other local aggregation intermediate state). This is a common
> scenario. The problem is to give reasonable semantics to this computation
> in the presence of failures. The processor effectively has a
> position/offset in each of its input streams as well as whatever local
> state. The problem is that if this process fails it needs to restore to a
> state that matches the last produced messages. There are several solutions
> to this problem. One is to make the output somehow idempotent, this will
> solve some cases but is not a general solution as many things cannot be
> made idempotent easily.
>
> I think the two proposals you give outline a couple of basic approaches:
> 1. Store the messages on the server somewhere but don't add them to the log
> until the commit call
> 2. Store the messages in the log but don't make them available to the
> consumer until the commit call
> Another option you didn't mention:
>
> I can give several subtleties to these approaches.
>
> One advantage of the second approach is that messages are in the log and
> can be available for reading or not. This makes it possible to support a
> kind of "dirty read" that allows the consumer to specify whether they want
> to immediately see all messages with low latency but potentially see
> uncommitted messages or only see committed messages.
>
> The problem with the second approach at least in the way you describe it is
> that you have to lock the log until the commit occurs otherwise you can't
> roll back (because otherwise someone else may have appended their own
> messages and you can't truncate the log). This would have all the problems
> of remote locks. I think this might be a deal-breaker.
>
> Another variation on the second approach would be the following: have each
> producer maintain an id and generation number. Keep a schedule of valid
> offset/id/generation numbers on the broker and only hand these out. This
> solution would support non-blocking multi-writer appends but requires more
> participation from the producer (i.e. getting a generation number and id).
>
> Cheers,
>
> -Jay
>
> On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <to...@gmail.com> wrote:
>
> > I have come up with two different possibilities, both with different
> > trade-offs.
> >
> > The first would be to support "true" transactions by writing
> > transactional data into a temporary file and then copy it directly to
> > the end of the partition when the commit command is created. The
> > upside to this approach is that individual transactions can be larger
> > than a single batch, and more than one producer could conduct
> > transactions at once. The downside is the extra IO involved in writing
> > it and reading it from disk an extra time.
> >
> > The second would be to allow any number of messages to be appended to
> > a topic, but not move the "end of topic" offset until the commit was
> > received. If a rollback was received, or the producer timed out, the
> > partition could be truncated at the most recently recognized "end of
> > topic" offset. The upside is that there is very little extra IO (only
> > to store the official "end of topic" metadata), and it seems like it
> > should be easy to implement. The downside is that this the
> > "transaction" feature is incompatible with anything but a single
> > producer per partition.
> >
> > I am interested in your thoughts on these.
> >
> > --Tom
> >
> > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <ph...@loggly.com>
> wrote:
> > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> > >> The closest concept of transaction on the publisher side, that I can
> > >> think of, is using batch of messages in a single call to the
> > >> synchronous producer.
> > >>
> > >> Precisely, you can configure a Kafka producer to use the "sync" mode
> > >> and batch messages that require transactional guarantees in a
> > >> single send() call. That will ensure that either all the messages in
> > >> the batch are sent or none.
> > >
> > > This is an interesting feature -- something I wasn't aware of. Still it
> > > doesn't solve the problem *completely*. As many people realise, it's
> > still
> > > possible for the batch of messages to get into Kafka fine, but the ack
> > from
> > > Kafka to be lost on its way back to the Producer. In that case the
> > Producer
> > > erroneously believes the messages didn't get in, and might re-send
> them.
> > >
> > > You guys *haven't* solved that issue, right? I believe you write about
> > it on
> > > the Kafka site.
> > >
> > >>
> > >> Thanks,
> > >> Neha
> > >>
> > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
> > wrote:
> > >> > Is there an accepted, or recommended way to make writes to a Kafka
> > >> > queue idempotent, or within a transaction?
> > >> >
> > >> > I can configure my system such that each queue has exactly one
> > producer.
> > >> >
> > >> > (If there are no accepted/recommended ways, I have a few ideas I
> would
> > >> > like to propose. I would also be willing to implement them if
> needed)
> > >> >
> > >> > Thanks in advance!
> > >> >
> > >> > --Tom
> > >
> > > --
> > > Philip O'Toole
> > >
> > > Senior Developer
> > > Loggly, Inc.
> > > San Francisco, Calif.
> > > www.loggly.com
> > >
> > > Come join us!
> > > http://loggly.com/company/careers/
> >
>

Re: Transactional writing

Posted by Jay Kreps <ja...@gmail.com>.
This is an important feature and I am interested in helping out in the
design and implementation, though I am working on 0.8 features for the next
month so I may not be of too much use. I have thought a little bit about
this, but I am not yet sure of the best approach.

Here is a specific use case I think is important to address: consider a
case where you are doing processing of one or more streams and producing an
output stream. This processing may involve some kind of local state (say
counters or other local aggregation intermediate state). This is a common
scenario. The problem is to give reasonable semantics to this computation
in the presence of failures. The processor effectively has a
position/offset in each of its input streams as well as whatever local
state. The problem is that if this process fails it needs to restore to a
state that matches the last produced messages. There are several solutions
to this problem. One is to make the output somehow idempotent, this will
solve some cases but is not a general solution as many things cannot be
made idempotent easily.

I think the two proposals you give outline a couple of basic approaches:
1. Store the messages on the server somewhere but don't add them to the log
until the commit call
2. Store the messages in the log but don't make them available to the
consumer until the commit call
Another option you didn't mention:

I can give several subtleties to these approaches.

One advantage of the second approach is that messages are in the log and
can be available for reading or not. This makes it possible to support a
kind of "dirty read" that allows the consumer to specify whether they want
to immediately see all messages with low latency but potentially see
uncommitted messages or only see committed messages.

The problem with the second approach at least in the way you describe it is
that you have to lock the log until the commit occurs otherwise you can't
roll back (because otherwise someone else may have appended their own
messages and you can't truncate the log). This would have all the problems
of remote locks. I think this might be a deal-breaker.

Another variation on the second approach would be the following: have each
producer maintain an id and generation number. Keep a schedule of valid
offset/id/generation numbers on the broker and only hand these out. This
solution would support non-blocking multi-writer appends but requires more
participation from the producer (i.e. getting a generation number and id).

Cheers,

-Jay

On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <to...@gmail.com> wrote:

> I have come up with two different possibilities, both with different
> trade-offs.
>
> The first would be to support "true" transactions by writing
> transactional data into a temporary file and then copy it directly to
> the end of the partition when the commit command is created. The
> upside to this approach is that individual transactions can be larger
> than a single batch, and more than one producer could conduct
> transactions at once. The downside is the extra IO involved in writing
> it and reading it from disk an extra time.
>
> The second would be to allow any number of messages to be appended to
> a topic, but not move the "end of topic" offset until the commit was
> received. If a rollback was received, or the producer timed out, the
> partition could be truncated at the most recently recognized "end of
> topic" offset. The upside is that there is very little extra IO (only
> to store the official "end of topic" metadata), and it seems like it
> should be easy to implement. The downside is that this the
> "transaction" feature is incompatible with anything but a single
> producer per partition.
>
> I am interested in your thoughts on these.
>
> --Tom
>
> On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <ph...@loggly.com> wrote:
> > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> >> The closest concept of transaction on the publisher side, that I can
> >> think of, is using batch of messages in a single call to the
> >> synchronous producer.
> >>
> >> Precisely, you can configure a Kafka producer to use the "sync" mode
> >> and batch messages that require transactional guarantees in a
> >> single send() call. That will ensure that either all the messages in
> >> the batch are sent or none.
> >
> > This is an interesting feature -- something I wasn't aware of. Still it
> > doesn't solve the problem *completely*. As many people realise, it's
> still
> > possible for the batch of messages to get into Kafka fine, but the ack
> from
> > Kafka to be lost on its way back to the Producer. In that case the
> Producer
> > erroneously believes the messages didn't get in, and might re-send them.
> >
> > You guys *haven't* solved that issue, right? I believe you write about
> it on
> > the Kafka site.
> >
> >>
> >> Thanks,
> >> Neha
> >>
> >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
> wrote:
> >> > Is there an accepted, or recommended way to make writes to a Kafka
> >> > queue idempotent, or within a transaction?
> >> >
> >> > I can configure my system such that each queue has exactly one
> producer.
> >> >
> >> > (If there are no accepted/recommended ways, I have a few ideas I would
> >> > like to propose. I would also be willing to implement them if needed)
> >> >
> >> > Thanks in advance!
> >> >
> >> > --Tom
> >
> > --
> > Philip O'Toole
> >
> > Senior Developer
> > Loggly, Inc.
> > San Francisco, Calif.
> > www.loggly.com
> >
> > Come join us!
> > http://loggly.com/company/careers/
>

Re: Transactional writing

Posted by Evan chan <ev...@ooyala.com>.
A third possibility is to use a different storage backend, like Cassandra, which easily can support idem potent writes. You would hash the unique message ID and time stamp into row and column keys. 

Note that this scheme would possibly allow using as a priority queue. 

-Evan
Carry your candle, run to the darkness
Seek out the helpless, deceived and poor
Hold out your candle for all to see it
Take your candle, and go light your world
 


On Oct 25, 2012, at 7:04 PM, Tom Brown <to...@gmail.com> wrote:

> I have come up with two different possibilities, both with different trade-offs.
> 
> The first would be to support "true" transactions by writing
> transactional data into a temporary file and then copy it directly to
> the end of the partition when the commit command is created. The
> upside to this approach is that individual transactions can be larger
> than a single batch, and more than one producer could conduct
> transactions at once. The downside is the extra IO involved in writing
> it and reading it from disk an extra time.
> 
> The second would be to allow any number of messages to be appended to
> a topic, but not move the "end of topic" offset until the commit was
> received. If a rollback was received, or the producer timed out, the
> partition could be truncated at the most recently recognized "end of
> topic" offset. The upside is that there is very little extra IO (only
> to store the official "end of topic" metadata), and it seems like it
> should be easy to implement. The downside is that this the
> "transaction" feature is incompatible with anything but a single
> producer per partition.
> 
> I am interested in your thoughts on these.
> 
> --Tom
> 
> On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <ph...@loggly.com> wrote:
>> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
>>> The closest concept of transaction on the publisher side, that I can
>>> think of, is using batch of messages in a single call to the
>>> synchronous producer.
>>> 
>>> Precisely, you can configure a Kafka producer to use the "sync" mode
>>> and batch messages that require transactional guarantees in a
>>> single send() call. That will ensure that either all the messages in
>>> the batch are sent or none.
>> 
>> This is an interesting feature -- something I wasn't aware of. Still it
>> doesn't solve the problem *completely*. As many people realise, it's still
>> possible for the batch of messages to get into Kafka fine, but the ack from
>> Kafka to be lost on its way back to the Producer. In that case the Producer
>> erroneously believes the messages didn't get in, and might re-send them.
>> 
>> You guys *haven't* solved that issue, right? I believe you write about it on
>> the Kafka site.
>> 
>>> 
>>> Thanks,
>>> Neha
>>> 
>>> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com> wrote:
>>>> Is there an accepted, or recommended way to make writes to a Kafka
>>>> queue idempotent, or within a transaction?
>>>> 
>>>> I can configure my system such that each queue has exactly one producer.
>>>> 
>>>> (If there are no accepted/recommended ways, I have a few ideas I would
>>>> like to propose. I would also be willing to implement them if needed)
>>>> 
>>>> Thanks in advance!
>>>> 
>>>> --Tom
>> 
>> --
>> Philip O'Toole
>> 
>> Senior Developer
>> Loggly, Inc.
>> San Francisco, Calif.
>> www.loggly.com
>> 
>> Come join us!
>> http://loggly.com/company/careers/

Re: Transactional writing

Posted by Tom Brown <to...@gmail.com>.
I have come up with two different possibilities, both with different trade-offs.

The first would be to support "true" transactions by writing
transactional data into a temporary file and then copy it directly to
the end of the partition when the commit command is created. The
upside to this approach is that individual transactions can be larger
than a single batch, and more than one producer could conduct
transactions at once. The downside is the extra IO involved in writing
it and reading it from disk an extra time.

The second would be to allow any number of messages to be appended to
a topic, but not move the "end of topic" offset until the commit was
received. If a rollback was received, or the producer timed out, the
partition could be truncated at the most recently recognized "end of
topic" offset. The upside is that there is very little extra IO (only
to store the official "end of topic" metadata), and it seems like it
should be easy to implement. The downside is that this the
"transaction" feature is incompatible with anything but a single
producer per partition.

I am interested in your thoughts on these.

--Tom

On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <ph...@loggly.com> wrote:
> On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
>> The closest concept of transaction on the publisher side, that I can
>> think of, is using batch of messages in a single call to the
>> synchronous producer.
>>
>> Precisely, you can configure a Kafka producer to use the "sync" mode
>> and batch messages that require transactional guarantees in a
>> single send() call. That will ensure that either all the messages in
>> the batch are sent or none.
>
> This is an interesting feature -- something I wasn't aware of. Still it
> doesn't solve the problem *completely*. As many people realise, it's still
> possible for the batch of messages to get into Kafka fine, but the ack from
> Kafka to be lost on its way back to the Producer. In that case the Producer
> erroneously believes the messages didn't get in, and might re-send them.
>
> You guys *haven't* solved that issue, right? I believe you write about it on
> the Kafka site.
>
>>
>> Thanks,
>> Neha
>>
>> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com> wrote:
>> > Is there an accepted, or recommended way to make writes to a Kafka
>> > queue idempotent, or within a transaction?
>> >
>> > I can configure my system such that each queue has exactly one producer.
>> >
>> > (If there are no accepted/recommended ways, I have a few ideas I would
>> > like to propose. I would also be willing to implement them if needed)
>> >
>> > Thanks in advance!
>> >
>> > --Tom
>
> --
> Philip O'Toole
>
> Senior Developer
> Loggly, Inc.
> San Francisco, Calif.
> www.loggly.com
>
> Come join us!
> http://loggly.com/company/careers/

Re: Transactional writing

Posted by Philip O'Toole <ph...@loggly.com>.
On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote:
> The closest concept of transaction on the publisher side, that I can
> think of, is using batch of messages in a single call to the
> synchronous producer.
> 
> Precisely, you can configure a Kafka producer to use the "sync" mode
> and batch messages that require transactional guarantees in a
> single send() call. That will ensure that either all the messages in
> the batch are sent or none.

This is an interesting feature -- something I wasn't aware of. Still it
doesn't solve the problem *completely*. As many people realise, it's still
possible for the batch of messages to get into Kafka fine, but the ack from
Kafka to be lost on its way back to the Producer. In that case the Producer
erroneously believes the messages didn't get in, and might re-send them.

You guys *haven't* solved that issue, right? I believe you write about it on
the Kafka site.

> 
> Thanks,
> Neha
> 
> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com> wrote:
> > Is there an accepted, or recommended way to make writes to a Kafka
> > queue idempotent, or within a transaction?
> >
> > I can configure my system such that each queue has exactly one producer.
> >
> > (If there are no accepted/recommended ways, I have a few ideas I would
> > like to propose. I would also be willing to implement them if needed)
> >
> > Thanks in advance!
> >
> > --Tom

-- 
Philip O'Toole

Senior Developer
Loggly, Inc.
San Francisco, Calif.
www.loggly.com

Come join us!
http://loggly.com/company/careers/

Re: Transactional writing

Posted by Jay Kreps <ja...@gmail.com>.
Yeah that is a good question. I think we started with good throughput and
scalability and are then adding in the features we can without breaking
these things or over-complicating the design. For example in 0.8 we have *
much* better delivery guarantees than most messaging systems (imo).

The use case I gave is my motivation for being interested in this. Giving
good semantics to incremental processing is an important use case and one
that often goes along with high throughput requirements (which is why you
wouldn't just use a traditional RDBMS).

I think the question here is whether there is an implementation of
transactions that works well in our design and doesn't kill performance.

-Jay

On Fri, Oct 26, 2012 at 11:29 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> Correct me if I'm wrong, but I thought the intention of kafka was not
> really to handle this use case (e.g. transactional writing nor guaranteed
> delivery semantics).  Why wouldn't you use a jms queue system (e.g.
> activemq) if you need transactional messaging, backed by a database, etc.?
>
> Jason
>
>
> On Fri, Oct 26, 2012 at 11:18 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > There are a few oddities of using the batching feature to get a kind of
> > transactionality (that wasn't the original intention):
> > 1. It only actually works if you enable compression. Currently I don't
> > think we allow uncompressed recursive message batches. Without this the
> > batching only protects against producer failure, in the case of broker
> > failure their is no guarantee (either with or without replication) that
> you
> > won't fail in the middle of writing the batch. We could
> > consider separating out the batching from the compression support to make
> > this work in a more sane way.
> > 2. It doesn't work across partitions or topics.
> >
> > -Jay
> >
> > On Thu, Oct 25, 2012 at 6:19 PM, Neha Narkhede <neha.narkhede@gmail.com
> > >wrote:
> >
> > > The closest concept of transaction on the publisher side, that I can
> > > think of, is using batch of messages in a single call to the
> > > synchronous producer.
> > >
> > > Precisely, you can configure a Kafka producer to use the "sync" mode
> > > and batch messages that require transactional guarantees in a
> > > single send() call. That will ensure that either all the messages in
> > > the batch are sent or none.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com>
> wrote:
> > > > Is there an accepted, or recommended way to make writes to a Kafka
> > > > queue idempotent, or within a transaction?
> > > >
> > > > I can configure my system such that each queue has exactly one
> > producer.
> > > >
> > > > (If there are no accepted/recommended ways, I have a few ideas I
> would
> > > > like to propose. I would also be willing to implement them if needed)
> > > >
> > > > Thanks in advance!
> > > >
> > > > --Tom
> > >
> >
>

Re: Transactional writing

Posted by Jason Rosenberg <jb...@squareup.com>.
Correct me if I'm wrong, but I thought the intention of kafka was not
really to handle this use case (e.g. transactional writing nor guaranteed
delivery semantics).  Why wouldn't you use a jms queue system (e.g.
activemq) if you need transactional messaging, backed by a database, etc.?

Jason


On Fri, Oct 26, 2012 at 11:18 AM, Jay Kreps <ja...@gmail.com> wrote:

> There are a few oddities of using the batching feature to get a kind of
> transactionality (that wasn't the original intention):
> 1. It only actually works if you enable compression. Currently I don't
> think we allow uncompressed recursive message batches. Without this the
> batching only protects against producer failure, in the case of broker
> failure their is no guarantee (either with or without replication) that you
> won't fail in the middle of writing the batch. We could
> consider separating out the batching from the compression support to make
> this work in a more sane way.
> 2. It doesn't work across partitions or topics.
>
> -Jay
>
> On Thu, Oct 25, 2012 at 6:19 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > The closest concept of transaction on the publisher side, that I can
> > think of, is using batch of messages in a single call to the
> > synchronous producer.
> >
> > Precisely, you can configure a Kafka producer to use the "sync" mode
> > and batch messages that require transactional guarantees in a
> > single send() call. That will ensure that either all the messages in
> > the batch are sent or none.
> >
> > Thanks,
> > Neha
> >
> > On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com> wrote:
> > > Is there an accepted, or recommended way to make writes to a Kafka
> > > queue idempotent, or within a transaction?
> > >
> > > I can configure my system such that each queue has exactly one
> producer.
> > >
> > > (If there are no accepted/recommended ways, I have a few ideas I would
> > > like to propose. I would also be willing to implement them if needed)
> > >
> > > Thanks in advance!
> > >
> > > --Tom
> >
>

Re: Transactional writing

Posted by Jay Kreps <ja...@gmail.com>.
There are a few oddities of using the batching feature to get a kind of
transactionality (that wasn't the original intention):
1. It only actually works if you enable compression. Currently I don't
think we allow uncompressed recursive message batches. Without this the
batching only protects against producer failure, in the case of broker
failure their is no guarantee (either with or without replication) that you
won't fail in the middle of writing the batch. We could
consider separating out the batching from the compression support to make
this work in a more sane way.
2. It doesn't work across partitions or topics.

-Jay

On Thu, Oct 25, 2012 at 6:19 PM, Neha Narkhede <ne...@gmail.com>wrote:

> The closest concept of transaction on the publisher side, that I can
> think of, is using batch of messages in a single call to the
> synchronous producer.
>
> Precisely, you can configure a Kafka producer to use the "sync" mode
> and batch messages that require transactional guarantees in a
> single send() call. That will ensure that either all the messages in
> the batch are sent or none.
>
> Thanks,
> Neha
>
> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com> wrote:
> > Is there an accepted, or recommended way to make writes to a Kafka
> > queue idempotent, or within a transaction?
> >
> > I can configure my system such that each queue has exactly one producer.
> >
> > (If there are no accepted/recommended ways, I have a few ideas I would
> > like to propose. I would also be willing to implement them if needed)
> >
> > Thanks in advance!
> >
> > --Tom
>

Re: Transactional writing

Posted by Neha Narkhede <ne...@gmail.com>.
The closest concept of transaction on the publisher side, that I can
think of, is using batch of messages in a single call to the
synchronous producer.

Precisely, you can configure a Kafka producer to use the "sync" mode
and batch messages that require transactional guarantees in a
single send() call. That will ensure that either all the messages in
the batch are sent or none.

Thanks,
Neha

On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <to...@gmail.com> wrote:
> Is there an accepted, or recommended way to make writes to a Kafka
> queue idempotent, or within a transaction?
>
> I can configure my system such that each queue has exactly one producer.
>
> (If there are no accepted/recommended ways, I have a few ideas I would
> like to propose. I would also be willing to implement them if needed)
>
> Thanks in advance!
>
> --Tom