You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kane Kane <ka...@gmail.com> on 2013/10/16 17:56:33 UTC

producer.send atomic?

Hello, as I understand send is not atomic, i.e. i have something like this
in my code:

    val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
    for (message <- messages) {
      requests += new KeyedMessage(topic, null, message, message)
    }
    producer.send(requests)

That means batch can die in the middle?
Also what happens if during this write the broker that has some partition
leaders will die? The same question for consumer, what happens if broker
dies while consumer reading from it?

Thanks.

Re: producer.send atomic?

Posted by Guozhang Wang <wa...@gmail.com>.
The "atomicity" is per broker-request, hence one batch can be distributed
as produce requests to multiple brokers, and if one produce request failed
it will be retried but not the whole batch.

The produce does record which request were successfully sent in the logs,
but not returned in the send() function call.

Guozhang


On Wed, Oct 16, 2013 at 10:52 AM, Kane Kane <ka...@gmail.com> wrote:

> Hi, so yeah, as i see here:
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala;h=c8326a8a991cdfebec0d86003d08ce8d2e2c6986;hb=HEAD#l94looks
> like batch to a single broker is atomic indeed, what if i have
> messages to all brokers in single batch? That means some requests might
> succeed and others not?
> does producer.send report which messages were successfully written and
> which not?
>
> Thanks!
>
>
> On Wed, Oct 16, 2013 at 10:34 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Hi Kane,
> >
> > If the producer is async, the send(requests) function call would not
> > necessarily trigger the real sending action. The sending action is
> > triggered either if enough time has elapsed or enough messages have been
> > batched on the client side. One batch of messages to each broker will be
> > either send successfully or not at all (in this sense "atomic"), and if
> > failed the whole batch will be re-tried to that broker again.
> >
> > As for consumers (I am assuming high-level consumers here), since the
> > consumers themselves will need to remember the offsets still which they
> > have consumed, if the consume request fails the consumers will just
> > re-issue the request starting with the previous offsets again.
> >
> > Guozhang
> >
> >
> > On Wed, Oct 16, 2013 at 8:56 AM, Kane Kane <ka...@gmail.com>
> wrote:
> >
> > > Hello, as I understand send is not atomic, i.e. i have something like
> > this
> > > in my code:
> > >
> > >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> > >     for (message <- messages) {
> > >       requests += new KeyedMessage(topic, null, message, message)
> > >     }
> > >     producer.send(requests)
> > >
> > > That means batch can die in the middle?
> > > Also what happens if during this write the broker that has some
> partition
> > > leaders will die? The same question for consumer, what happens if
> broker
> > > dies while consumer reading from it?
> > >
> > > Thanks.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: producer.send atomic?

Posted by Kane Kane <ka...@gmail.com>.
Hi, so yeah, as i see here:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala;h=c8326a8a991cdfebec0d86003d08ce8d2e2c6986;hb=HEAD#l94looks
like batch to a single broker is atomic indeed, what if i have
messages to all brokers in single batch? That means some requests might
succeed and others not?
does producer.send report which messages were successfully written and
which not?

Thanks!


On Wed, Oct 16, 2013 at 10:34 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Kane,
>
> If the producer is async, the send(requests) function call would not
> necessarily trigger the real sending action. The sending action is
> triggered either if enough time has elapsed or enough messages have been
> batched on the client side. One batch of messages to each broker will be
> either send successfully or not at all (in this sense "atomic"), and if
> failed the whole batch will be re-tried to that broker again.
>
> As for consumers (I am assuming high-level consumers here), since the
> consumers themselves will need to remember the offsets still which they
> have consumed, if the consume request fails the consumers will just
> re-issue the request starting with the previous offsets again.
>
> Guozhang
>
>
> On Wed, Oct 16, 2013 at 8:56 AM, Kane Kane <ka...@gmail.com> wrote:
>
> > Hello, as I understand send is not atomic, i.e. i have something like
> this
> > in my code:
> >
> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> >     for (message <- messages) {
> >       requests += new KeyedMessage(topic, null, message, message)
> >     }
> >     producer.send(requests)
> >
> > That means batch can die in the middle?
> > Also what happens if during this write the broker that has some partition
> > leaders will die? The same question for consumer, what happens if broker
> > dies while consumer reading from it?
> >
> > Thanks.
> >
>
>
>
> --
> -- Guozhang
>

Re: producer.send atomic?

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Kane,

If the producer is async, the send(requests) function call would not
necessarily trigger the real sending action. The sending action is
triggered either if enough time has elapsed or enough messages have been
batched on the client side. One batch of messages to each broker will be
either send successfully or not at all (in this sense "atomic"), and if
failed the whole batch will be re-tried to that broker again.

As for consumers (I am assuming high-level consumers here), since the
consumers themselves will need to remember the offsets still which they
have consumed, if the consume request fails the consumers will just
re-issue the request starting with the previous offsets again.

Guozhang


On Wed, Oct 16, 2013 at 8:56 AM, Kane Kane <ka...@gmail.com> wrote:

> Hello, as I understand send is not atomic, i.e. i have something like this
> in my code:
>
>     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
>     for (message <- messages) {
>       requests += new KeyedMessage(topic, null, message, message)
>     }
>     producer.send(requests)
>
> That means batch can die in the middle?
> Also what happens if during this write the broker that has some partition
> leaders will die? The same question for consumer, what happens if broker
> dies while consumer reading from it?
>
> Thanks.
>



-- 
-- Guozhang