You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kane Kane <ka...@gmail.com> on 2013/10/25 17:45:12 UTC

producer exceptions when broker dies

I have cluster of 3 kafka brokers. With the following script I send some
data to kafka and in the middle do the controlled shutdown of 1 broker. All
3 brokers are ISR before I start sending. When i shutdown the broker i get
a couple of exceptions and I expect data shouldn't be written. Say, I send
1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
instead i always consume more, i.e. 1480 or 1490. I want to decide if I
want to retry sending myself, not using message.send.max.retries. But looks
like if I retry sending if there is an exception - I will end up with
duplicates. Is there anything I'm doing wrong or having wrong assumptions
about kafka?

Thanks.

val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
10.80.42.156:9092")
var count = 0
for(line <- Source.fromFile(file).getLines()){
    try {
      prod.send("benchmark", buffer.toList)
      count += 1
      println("sent %s", count)
    } catch {
      case _ => println("Exception!")
    }
}

class MyProducer(brokerList: String) {
  val sync = true
  val requestRequiredAcks = "-1"

  val props = new Properties()
  props.put("metadata.broker.list", brokerList)
  props.put("producer.type", if(sync) "sync" else "async")
  props.put("request.required.acks", requestRequiredAcks)
  props.put("key.serializer.class", classOf[StringEncoder].getName)
  props.put("serializer.class", classOf[StringEncoder].getName)
  props.put("message.send.max.retries", "0")
  props.put("request.timeout.ms", "2000")

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

  def send(topic: String, messages: List[String]) = {
    val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
    for (message <- messages) {
      requests += new KeyedMessage(topic, null, message, message)
    }
    producer.send(requests)
  }
}

Re: producer exceptions when broker dies

Posted by Kane Kane <ka...@gmail.com>.
I.e. from the documentation:

So effectively Kafka guarantees at-least-once delivery by default and
allows the user to implement at most once delivery by disabling retries on
the producer

I've disabled retries but it's not at-most-once which my test proves. It's
still at-least-once.



On Fri, Oct 25, 2013 at 11:26 AM, Kane Kane <ka...@gmail.com> wrote:

> Hello Aniket,
>
> Thanks for the answer, this totally makes sense and implementing that
> layer on consumer side
> to check for dups sound like a good solution to this issue.
>
> Can we get a confirmation from kafka devs that this is how kafka supposed
> to work (by design)
> and how we should implement the solution to it. I'd hate to implement
> something that is already
> built into kafka (i.e. controlled by some configuration settings).
>
> Thanks again.
>
>
>
> On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> aniket.bhatnagar@gmail.com> wrote:
>
>> As per my understanding, if the broker says the msg is committed,  its
>> guaranteed to have been committed as per ur ack config. If it says it did
>> not get committed, then its very hard to figure out if this was just a
>> false error. Since there is concept of unique ids for messages, a replay
>> of
>> the same message will result in duplication. I think its a reasonable
>> behaviour considering kafka prefers to append data to partitions fot
>> performance reasons.
>> The best way to right now deal with duplicate msgs is to build the
>> processing engine (layer where your consumer sits) to deal with at least
>> once semantics of the broker.
>> On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com> wrote:
>>
>> > Or, to rephrase it more generally, is there a way to know exactly if
>> > message was committed or no?
>> >
>> >
>> > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <ka...@gmail.com>
>> wrote:
>> >
>> > > Hello Guozhang,
>> > >
>> > > My partitions are split almost evenly between broker, so, yes - broker
>> > > that I shutdown is the leader for some of them. Does it mean i can
>> get an
>> > > exception and data is still being written? Is there any setting on the
>> > > broker where i can control this? I.e. can i make broker replication
>> > timeout
>> > > shorter than producer timeout, so i can ensure if i get an exception
>> data
>> > > is not being committed?
>> > >
>> > > Thanks.
>> > >
>> > >
>> > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangguoz@gmail.com
>> > >wrote:
>> > >
>> > >> Hello Kane,
>> > >>
>> > >> As discussed in the other thread, even if a timeout response is sent
>> > back
>> > >> to the producer, the message may still be committed.
>> > >>
>> > >> Did you shut down the leader broker of the partition or a follower
>> > broker?
>> > >>
>> > >> Guozhang
>> > >>
>> > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <ka...@gmail.com>
>> > wrote:
>> > >>
>> > >> > I have cluster of 3 kafka brokers. With the following script I send
>> > some
>> > >> > data to kafka and in the middle do the controlled shutdown of 1
>> > broker.
>> > >> All
>> > >> > 3 brokers are ISR before I start sending. When i shutdown the
>> broker i
>> > >> get
>> > >> > a couple of exceptions and I expect data shouldn't be written.
>> Say, I
>> > >> send
>> > >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines,
>> but
>> > >> > instead i always consume more, i.e. 1480 or 1490. I want to decide
>> if
>> > I
>> > >> > want to retry sending myself, not using message.send.max.retries.
>> But
>> > >> looks
>> > >> > like if I retry sending if there is an exception - I will end up
>> with
>> > >> > duplicates. Is there anything I'm doing wrong or having wrong
>> > >> assumptions
>> > >> > about kafka?
>> > >> >
>> > >> > Thanks.
>> > >> >
>> > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
>> > >> > 10.80.42.156:9092")
>> > >> > var count = 0
>> > >> > for(line <- Source.fromFile(file).getLines()){
>> > >> >     try {
>> > >> >       prod.send("benchmark", buffer.toList)
>> > >> >       count += 1
>> > >> >       println("sent %s", count)
>> > >> >     } catch {
>> > >> >       case _ => println("Exception!")
>> > >> >     }
>> > >> > }
>> > >> >
>> > >> > class MyProducer(brokerList: String) {
>> > >> >   val sync = true
>> > >> >   val requestRequiredAcks = "-1"
>> > >> >
>> > >> >   val props = new Properties()
>> > >> >   props.put("metadata.broker.list", brokerList)
>> > >> >   props.put("producer.type", if(sync) "sync" else "async")
>> > >> >   props.put("request.required.acks", requestRequiredAcks)
>> > >> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
>> > >> >   props.put("serializer.class", classOf[StringEncoder].getName)
>> > >> >   props.put("message.send.max.retries", "0")
>> > >> >   props.put("request.timeout.ms", "2000")
>> > >> >
>> > >> >   val producer = new Producer[AnyRef, AnyRef](new
>> > ProducerConfig(props))
>> > >> >
>> > >> >   def send(topic: String, messages: List[String]) = {
>> > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
>> > >> >     for (message <- messages) {
>> > >> >       requests += new KeyedMessage(topic, null, message, message)
>> > >> >     }
>> > >> >     producer.send(requests)
>> > >> >   }
>> > >> > }
>> > >> >
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> -- Guozhang
>> > >>
>> > >
>> > >
>> >
>>
>
>

Re: producer exceptions when broker dies

Posted by Kane Kane <ka...@gmail.com>.
Hello Aniket,

Thanks for the answer, this totally makes sense and implementing that layer
on consumer side
to check for dups sound like a good solution to this issue.

Can we get a confirmation from kafka devs that this is how kafka supposed
to work (by design)
and how we should implement the solution to it. I'd hate to implement
something that is already
built into kafka (i.e. controlled by some configuration settings).

Thanks again.



On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
aniket.bhatnagar@gmail.com> wrote:

> As per my understanding, if the broker says the msg is committed,  its
> guaranteed to have been committed as per ur ack config. If it says it did
> not get committed, then its very hard to figure out if this was just a
> false error. Since there is concept of unique ids for messages, a replay of
> the same message will result in duplication. I think its a reasonable
> behaviour considering kafka prefers to append data to partitions fot
> performance reasons.
> The best way to right now deal with duplicate msgs is to build the
> processing engine (layer where your consumer sits) to deal with at least
> once semantics of the broker.
> On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com> wrote:
>
> > Or, to rephrase it more generally, is there a way to know exactly if
> > message was committed or no?
> >
> >
> > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <ka...@gmail.com>
> wrote:
> >
> > > Hello Guozhang,
> > >
> > > My partitions are split almost evenly between broker, so, yes - broker
> > > that I shutdown is the leader for some of them. Does it mean i can get
> an
> > > exception and data is still being written? Is there any setting on the
> > > broker where i can control this? I.e. can i make broker replication
> > timeout
> > > shorter than producer timeout, so i can ensure if i get an exception
> data
> > > is not being committed?
> > >
> > > Thanks.
> > >
> > >
> > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangguoz@gmail.com
> > >wrote:
> > >
> > >> Hello Kane,
> > >>
> > >> As discussed in the other thread, even if a timeout response is sent
> > back
> > >> to the producer, the message may still be committed.
> > >>
> > >> Did you shut down the leader broker of the partition or a follower
> > broker?
> > >>
> > >> Guozhang
> > >>
> > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <ka...@gmail.com>
> > wrote:
> > >>
> > >> > I have cluster of 3 kafka brokers. With the following script I send
> > some
> > >> > data to kafka and in the middle do the controlled shutdown of 1
> > broker.
> > >> All
> > >> > 3 brokers are ISR before I start sending. When i shutdown the
> broker i
> > >> get
> > >> > a couple of exceptions and I expect data shouldn't be written. Say,
> I
> > >> send
> > >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines,
> but
> > >> > instead i always consume more, i.e. 1480 or 1490. I want to decide
> if
> > I
> > >> > want to retry sending myself, not using message.send.max.retries.
> But
> > >> looks
> > >> > like if I retry sending if there is an exception - I will end up
> with
> > >> > duplicates. Is there anything I'm doing wrong or having wrong
> > >> assumptions
> > >> > about kafka?
> > >> >
> > >> > Thanks.
> > >> >
> > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> > >> > 10.80.42.156:9092")
> > >> > var count = 0
> > >> > for(line <- Source.fromFile(file).getLines()){
> > >> >     try {
> > >> >       prod.send("benchmark", buffer.toList)
> > >> >       count += 1
> > >> >       println("sent %s", count)
> > >> >     } catch {
> > >> >       case _ => println("Exception!")
> > >> >     }
> > >> > }
> > >> >
> > >> > class MyProducer(brokerList: String) {
> > >> >   val sync = true
> > >> >   val requestRequiredAcks = "-1"
> > >> >
> > >> >   val props = new Properties()
> > >> >   props.put("metadata.broker.list", brokerList)
> > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > >> >   props.put("request.required.acks", requestRequiredAcks)
> > >> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
> > >> >   props.put("serializer.class", classOf[StringEncoder].getName)
> > >> >   props.put("message.send.max.retries", "0")
> > >> >   props.put("request.timeout.ms", "2000")
> > >> >
> > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > ProducerConfig(props))
> > >> >
> > >> >   def send(topic: String, messages: List[String]) = {
> > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> > >> >     for (message <- messages) {
> > >> >       requests += new KeyedMessage(topic, null, message, message)
> > >> >     }
> > >> >     producer.send(requests)
> > >> >   }
> > >> > }
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>

Re: producer exceptions when broker dies

Posted by Guozhang Wang <wa...@gmail.com>.
I think you are right. I was too quick saying that pre 0.7 we also have
"at-least-once".

Guozhang


On Sun, Oct 27, 2013 at 9:51 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> Perhaps, it's a matter of semantics.
>
> But, I think I'm not talking only about failure, but normal operation.
>  It's normal to take a cluster down for maintenance, or code update.  And
> this should be done a rolling restart manner (1 server at a time).
>
> The reason for replication, is to increase reliability (as well as
> availability).  We've said that in 0.8.0, we can tolerate R-1 node failures
> (where R is the replication factor).  From the client's standpoint, this
> should mean that the cluster is still available and reliable when a single
> node is down (assuming R is > 1).
>
> When you say "at least once", you are suggesting that the message will be
> delivered at least once, and won't be lost.
>
> I don't think that was ever really true about the previous version <= 0.7
> (and I don't think I ever really read that about 0.7, as a guarantee, did
> I?).
>
> Jason
>
>
> On Sat, Oct 26, 2013 at 8:52 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Jason,
> >
> > You are right. I think we just have different definitions about "at least
> > once". What you have described to me is more related to "availability",
> > which says that your message will not be lost when there are failures.
> And
> > we achieve this through replication (which is related to
> > request.required.acks). In 0.7, we do not have any replications and hence
> > when a broker goes down all the partitions it holds will be
> non-available,
> > but we still defined Kafka as a "at least once" messaging system.
> >
> > Guozhang
> >
> >
> > On Sat, Oct 26, 2013 at 9:32 AM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > Guozhang,
> > >
> > > It turns out this is not entirely true, you do need
> > request.required.acks =
> > > -1 (and yes, you need to retry if failure) in order have guaranteed
> > > delivery.
> > >
> > > I discovered this, when doing tests with rolling restarts (and hard
> > > restarts) of the kafka servers.  If the server goes down, e.g. if
> there's
> > > any change in the ISR leader for a partition, then the only way you can
> > be
> > > sure what you produced will be available on a newly elected leader is
> to
> > > use -1.
> > >
> > > Jason
> > >
> > >
> > >
> > >
> > > On Sat, Oct 26, 2013 at 12:11 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > Jason,
> > > >
> > > > Setting request.required.acks=-1 is orthogonal to the 'at least once'
> > > > guarantee, it only relates to the latency/replication trade-off. For
> > > > example, even if you set request.required.acks to 1, and as long as
> you
> > > > retry on all non-fatal exceptions you have the "at least once"
> > guarantee;
> > > > and even if you set request.required.acks to -1 and you do not retry,
> > you
> > > > will not get "at least once".
> > > >
> > > > As you said, setting request.required.acks=-1 only means that when a
> > > > success response has been returned you know that the message has been
> > > > received by all ISR replicas, but this has nothing to do with "at
> least
> > > > once" guarantee.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 8:55 PM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > >
> > > > > Just to clarify, I think in order to get 'at least once'
> guarantees,
> > > you
> > > > > must produce messages with 'request.required.acks=-1'.  Otherwise,
> > you
> > > > > can't be 100% sure the message was received by all ISR replicas.
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <ka...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Thanks Guozhang, it makes sense if it's by design. Just wanted to
> > > > ensure
> > > > > > i'm not doing something wrong.
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > As we have said, the timeout exception does not actually mean
> the
> > > > > message
> > > > > > > is not committed to the broker. When message.send.max.retries
> is
> > 0
> > > > > Kafka
> > > > > > > does guarantee "at-most-once" which means that you will not
> have
> > > > > > > duplicates, but not means that all your exceptions can be
> treated
> > > as
> > > > > > > "message not delivered". In your case, 1480 - 1450 = 30
> messages
> > > are
> > > > > the
> > > > > > > ones that are actually sent, not the ones that are duplicates.
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <
> > kane.isturm@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > There are a lot of exceptions, I will try to pick an example
> of
> > > > each:
> > > > > > > > ERROR async.DefaultEventHandler - Failed to send requests for
> > > > topics
> > > > > > > > benchmark with correlation ids in [879,881]
> > > > > > > > WARN  async.DefaultEventHandler - Produce request with
> > > correlation
> > > > id
> > > > > > 874
> > > > > > > > failed due to [benchmark,43]:
> > > kafka.common.RequestTimedOutException
> > > > > > > > WARN  client.ClientUtils$ - Fetching topic metadata with
> > > > correlation
> > > > > id
> > > > > > > 876
> > > > > > > > for topics [Set(benchmark)] from broker
> > > > > > > [id:2,host:10.80.42.156,port:9092]
> > > > > > > > failed
> > > > > > > > ERROR producer.SyncProducer - Producer connection to
> > > > > > > > 10.80.42.156:9092unsuccessful
> > > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > > messages
> > > > > > after
> > > > > > > 0
> > > > > > > > tries.
> > > > > > > > WARN  async.DefaultEventHandler - Failed to send producer
> > request
> > > > > with
> > > > > > > > correlation id 270 to broker 0 with data for partitions
> > > > > [benchmark,42]
> > > > > > > >
> > > > > > > > I think these are all types of exceptions i see there.
> > > > > > > > Thanks.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <
> > > wangguoz@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Kane,
> > > > > > > > >
> > > > > > > > > If you set message.send.max.retries to 0 it should be
> > > > at-most-once,
> > > > > > > and I
> > > > > > > > > saw your props have the right config. What are the
> exceptions
> > > you
> > > > > got
> > > > > > > > from
> > > > > > > > > the send() call?
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <
> > > > > steve@stevemorin.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Kane and Aniket,
> > > > > > > > > >   I am interested in knowing what the pattern/solution
> that
> > > > > people
> > > > > > > > > usually
> > > > > > > > > > use to implement exactly once as well.
> > > > > > > > > > -Steve
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <
> > > > > kane.isturm@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Guozhang, but i've posted a piece from kafka
> > documentation
> > > > > above:
> > > > > > > > > > > So effectively Kafka guarantees at-least-once delivery
> by
> > > > > default
> > > > > > > and
> > > > > > > > > > > allows the user to implement at most once delivery by
> > > > disabling
> > > > > > > > retries
> > > > > > > > > > on
> > > > > > > > > > > the producer.
> > > > > > > > > > >
> > > > > > > > > > > What i want is at-most-once and docs claim it's
> possible
> > > with
> > > > > > > certain
> > > > > > > > > > > settings. Did i miss anything here?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> > > > > > > wangguoz@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Aniket is exactly right. In general, Kafka provides
> "at
> > > > least
> > > > > > > once"
> > > > > > > > > > > > guarantee instead of "exactly once".
> > > > > > > > > > > >
> > > > > > > > > > > > Guozhang
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > > > > > > > aniket.bhatnagar@gmail.com> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > As per my understanding, if the broker says the msg
> > is
> > > > > > > committed,
> > > > > > > > > >  its
> > > > > > > > > > > > > guaranteed to have been committed as per ur ack
> > config.
> > > > If
> > > > > it
> > > > > > > > says
> > > > > > > > > it
> > > > > > > > > > > did
> > > > > > > > > > > > > not get committed, then its very hard to figure out
> > if
> > > > this
> > > > > > was
> > > > > > > > > just
> > > > > > > > > > a
> > > > > > > > > > > > > false error. Since there is concept of unique ids
> for
> > > > > > > messages, a
> > > > > > > > > > > replay
> > > > > > > > > > > > of
> > > > > > > > > > > > > the same message will result in duplication. I
> think
> > > its
> > > > a
> > > > > > > > > reasonable
> > > > > > > > > > > > > behaviour considering kafka prefers to append data
> to
> > > > > > > partitions
> > > > > > > > > fot
> > > > > > > > > > > > > performance reasons.
> > > > > > > > > > > > > The best way to right now deal with duplicate msgs
> is
> > > to
> > > > > > build
> > > > > > > > the
> > > > > > > > > > > > > processing engine (layer where your consumer sits)
> to
> > > > deal
> > > > > > with
> > > > > > > > at
> > > > > > > > > > > least
> > > > > > > > > > > > > once semantics of the broker.
> > > > > > > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <
> > > kane.isturm@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Or, to rephrase it more generally, is there a way
> > to
> > > > know
> > > > > > > > exactly
> > > > > > > > > > if
> > > > > > > > > > > > > > message was committed or no?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > > > > > > > kane.isturm@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hello Guozhang,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > My partitions are split almost evenly between
> > > broker,
> > > > > so,
> > > > > > > > yes -
> > > > > > > > > > > > broker
> > > > > > > > > > > > > > > that I shutdown is the leader for some of them.
> > > Does
> > > > it
> > > > > > > mean
> > > > > > > > i
> > > > > > > > > > can
> > > > > > > > > > > > get
> > > > > > > > > > > > > an
> > > > > > > > > > > > > > > exception and data is still being written? Is
> > there
> > > > any
> > > > > > > > setting
> > > > > > > > > > on
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > broker where i can control this? I.e. can i
> make
> > > > broker
> > > > > > > > > > replication
> > > > > > > > > > > > > > timeout
> > > > > > > > > > > > > > > shorter than producer timeout, so i can ensure
> > if i
> > > > get
> > > > > > an
> > > > > > > > > > > exception
> > > > > > > > > > > > > data
> > > > > > > > > > > > > > > is not being committed?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang
> Wang <
> > > > > > > > > > > wangguoz@gmail.com
> > > > > > > > > > > > > > >wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> Hello Kane,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> As discussed in the other thread, even if a
> > > timeout
> > > > > > > response
> > > > > > > > > is
> > > > > > > > > > > sent
> > > > > > > > > > > > > > back
> > > > > > > > > > > > > > >> to the producer, the message may still be
> > > committed.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Did you shut down the leader broker of the
> > > partition
> > > > > or
> > > > > > a
> > > > > > > > > > follower
> > > > > > > > > > > > > > broker?
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Guozhang
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > > > > > > > kane.isturm@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > I have cluster of 3 kafka brokers. With the
> > > > > following
> > > > > > > > > script I
> > > > > > > > > > > > send
> > > > > > > > > > > > > > some
> > > > > > > > > > > > > > >> > data to kafka and in the middle do the
> > > controlled
> > > > > > > shutdown
> > > > > > > > > of
> > > > > > > > > > 1
> > > > > > > > > > > > > > broker.
> > > > > > > > > > > > > > >> All
> > > > > > > > > > > > > > >> > 3 brokers are ISR before I start sending.
> > When i
> > > > > > > shutdown
> > > > > > > > > the
> > > > > > > > > > > > > broker i
> > > > > > > > > > > > > > >> get
> > > > > > > > > > > > > > >> > a couple of exceptions and I expect data
> > > shouldn't
> > > > > be
> > > > > > > > > written.
> > > > > > > > > > > > Say,
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > >> send
> > > > > > > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect
> to
> > > > > consume
> > > > > > > 1450
> > > > > > > > > > > lines,
> > > > > > > > > > > > > but
> > > > > > > > > > > > > > >> > instead i always consume more, i.e. 1480 or
> > > 1490.
> > > > I
> > > > > > want
> > > > > > > > to
> > > > > > > > > > > decide
> > > > > > > > > > > > > if
> > > > > > > > > > > > > > I
> > > > > > > > > > > > > > >> > want to retry sending myself, not using
> > > > > > > > > > > message.send.max.retries.
> > > > > > > > > > > > > But
> > > > > > > > > > > > > > >> looks
> > > > > > > > > > > > > > >> > like if I retry sending if there is an
> > exception
> > > > - I
> > > > > > > will
> > > > > > > > > end
> > > > > > > > > > up
> > > > > > > > > > > > > with
> > > > > > > > > > > > > > >> > duplicates. Is there anything I'm doing
> wrong
> > or
> > > > > > having
> > > > > > > > > wrong
> > > > > > > > > > > > > > >> assumptions
> > > > > > > > > > > > > > >> > about kafka?
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Thanks.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > val prod = new MyProducer("
> 10.80.42.147:9092,
> > > > > > > > > > 10.80.42.154:9092,
> > > > > > > > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > > > > > > > >> > var count = 0
> > > > > > > > > > > > > > >> > for(line <-
> Source.fromFile(file).getLines()){
> > > > > > > > > > > > > > >> >     try {
> > > > > > > > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > > > > > > > >> >       count += 1
> > > > > > > > > > > > > > >> >       println("sent %s", count)
> > > > > > > > > > > > > > >> >     } catch {
> > > > > > > > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > > > > > > > >> >     }
> > > > > > > > > > > > > > >> > }
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > > > > > > > >> >   val sync = true
> > > > > > > > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >   val props = new Properties()
> > > > > > > > > > > > > > >> >   props.put("metadata.broker.list",
> > brokerList)
> > > > > > > > > > > > > > >> >   props.put("producer.type", if(sync) "sync"
> > > else
> > > > > > > "async")
> > > > > > > > > > > > > > >> >   props.put("request.required.acks",
> > > > > > > requestRequiredAcks)
> > > > > > > > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > > > >> >   props.put("serializer.class",
> > > > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >   val producer = new Producer[AnyRef,
> > > AnyRef](new
> > > > > > > > > > > > > > ProducerConfig(props))
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >   def send(topic: String, messages:
> > > List[String])
> > > > =
> > > > > {
> > > > > > > > > > > > > > >> >     val requests = new
> > > > > > ArrayBuffer[KeyedMessage[AnyRef,
> > > > > > > > > > AnyRef]]
> > > > > > > > > > > > > > >> >     for (message <- messages) {
> > > > > > > > > > > > > > >> >       requests += new KeyedMessage(topic,
> > null,
> > > > > > message,
> > > > > > > > > > > message)
> > > > > > > > > > > > > > >> >     }
> > > > > > > > > > > > > > >> >     producer.send(requests)
> > > > > > > > > > > > > > >> >   }
> > > > > > > > > > > > > > >> > }
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> --
> > > > > > > > > > > > > > >> -- Guozhang
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: producer exceptions when broker dies

Posted by Jason Rosenberg <jb...@squareup.com>.
Perhaps, it's a matter of semantics.

But, I think I'm not talking only about failure, but normal operation.
 It's normal to take a cluster down for maintenance, or code update.  And
this should be done a rolling restart manner (1 server at a time).

The reason for replication, is to increase reliability (as well as
availability).  We've said that in 0.8.0, we can tolerate R-1 node failures
(where R is the replication factor).  From the client's standpoint, this
should mean that the cluster is still available and reliable when a single
node is down (assuming R is > 1).

When you say "at least once", you are suggesting that the message will be
delivered at least once, and won't be lost.

I don't think that was ever really true about the previous version <= 0.7
(and I don't think I ever really read that about 0.7, as a guarantee, did
I?).

Jason


On Sat, Oct 26, 2013 at 8:52 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Jason,
>
> You are right. I think we just have different definitions about "at least
> once". What you have described to me is more related to "availability",
> which says that your message will not be lost when there are failures. And
> we achieve this through replication (which is related to
> request.required.acks). In 0.7, we do not have any replications and hence
> when a broker goes down all the partitions it holds will be non-available,
> but we still defined Kafka as a "at least once" messaging system.
>
> Guozhang
>
>
> On Sat, Oct 26, 2013 at 9:32 AM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > Guozhang,
> >
> > It turns out this is not entirely true, you do need
> request.required.acks =
> > -1 (and yes, you need to retry if failure) in order have guaranteed
> > delivery.
> >
> > I discovered this, when doing tests with rolling restarts (and hard
> > restarts) of the kafka servers.  If the server goes down, e.g. if there's
> > any change in the ISR leader for a partition, then the only way you can
> be
> > sure what you produced will be available on a newly elected leader is to
> > use -1.
> >
> > Jason
> >
> >
> >
> >
> > On Sat, Oct 26, 2013 at 12:11 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Jason,
> > >
> > > Setting request.required.acks=-1 is orthogonal to the 'at least once'
> > > guarantee, it only relates to the latency/replication trade-off. For
> > > example, even if you set request.required.acks to 1, and as long as you
> > > retry on all non-fatal exceptions you have the "at least once"
> guarantee;
> > > and even if you set request.required.acks to -1 and you do not retry,
> you
> > > will not get "at least once".
> > >
> > > As you said, setting request.required.acks=-1 only means that when a
> > > success response has been returned you know that the message has been
> > > received by all ISR replicas, but this has nothing to do with "at least
> > > once" guarantee.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Oct 25, 2013 at 8:55 PM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > >
> > > > Just to clarify, I think in order to get 'at least once' guarantees,
> > you
> > > > must produce messages with 'request.required.acks=-1'.  Otherwise,
> you
> > > > can't be 100% sure the message was received by all ISR replicas.
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <ka...@gmail.com>
> > > wrote:
> > > >
> > > > > Thanks Guozhang, it makes sense if it's by design. Just wanted to
> > > ensure
> > > > > i'm not doing something wrong.
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > As we have said, the timeout exception does not actually mean the
> > > > message
> > > > > > is not committed to the broker. When message.send.max.retries is
> 0
> > > > Kafka
> > > > > > does guarantee "at-most-once" which means that you will not have
> > > > > > duplicates, but not means that all your exceptions can be treated
> > as
> > > > > > "message not delivered". In your case, 1480 - 1450 = 30 messages
> > are
> > > > the
> > > > > > ones that are actually sent, not the ones that are duplicates.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <
> kane.isturm@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > There are a lot of exceptions, I will try to pick an example of
> > > each:
> > > > > > > ERROR async.DefaultEventHandler - Failed to send requests for
> > > topics
> > > > > > > benchmark with correlation ids in [879,881]
> > > > > > > WARN  async.DefaultEventHandler - Produce request with
> > correlation
> > > id
> > > > > 874
> > > > > > > failed due to [benchmark,43]:
> > kafka.common.RequestTimedOutException
> > > > > > > WARN  client.ClientUtils$ - Fetching topic metadata with
> > > correlation
> > > > id
> > > > > > 876
> > > > > > > for topics [Set(benchmark)] from broker
> > > > > > [id:2,host:10.80.42.156,port:9092]
> > > > > > > failed
> > > > > > > ERROR producer.SyncProducer - Producer connection to
> > > > > > > 10.80.42.156:9092unsuccessful
> > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > messages
> > > > > after
> > > > > > 0
> > > > > > > tries.
> > > > > > > WARN  async.DefaultEventHandler - Failed to send producer
> request
> > > > with
> > > > > > > correlation id 270 to broker 0 with data for partitions
> > > > [benchmark,42]
> > > > > > >
> > > > > > > I think these are all types of exceptions i see there.
> > > > > > > Thanks.
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Kane,
> > > > > > > >
> > > > > > > > If you set message.send.max.retries to 0 it should be
> > > at-most-once,
> > > > > > and I
> > > > > > > > saw your props have the right config. What are the exceptions
> > you
> > > > got
> > > > > > > from
> > > > > > > > the send() call?
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <
> > > > steve@stevemorin.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Kane and Aniket,
> > > > > > > > >   I am interested in knowing what the pattern/solution that
> > > > people
> > > > > > > > usually
> > > > > > > > > use to implement exactly once as well.
> > > > > > > > > -Steve
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <
> > > > kane.isturm@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Guozhang, but i've posted a piece from kafka
> documentation
> > > > above:
> > > > > > > > > > So effectively Kafka guarantees at-least-once delivery by
> > > > default
> > > > > > and
> > > > > > > > > > allows the user to implement at most once delivery by
> > > disabling
> > > > > > > retries
> > > > > > > > > on
> > > > > > > > > > the producer.
> > > > > > > > > >
> > > > > > > > > > What i want is at-most-once and docs claim it's possible
> > with
> > > > > > certain
> > > > > > > > > > settings. Did i miss anything here?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> > > > > > wangguoz@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Aniket is exactly right. In general, Kafka provides "at
> > > least
> > > > > > once"
> > > > > > > > > > > guarantee instead of "exactly once".
> > > > > > > > > > >
> > > > > > > > > > > Guozhang
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > > > > > > aniket.bhatnagar@gmail.com> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > As per my understanding, if the broker says the msg
> is
> > > > > > committed,
> > > > > > > > >  its
> > > > > > > > > > > > guaranteed to have been committed as per ur ack
> config.
> > > If
> > > > it
> > > > > > > says
> > > > > > > > it
> > > > > > > > > > did
> > > > > > > > > > > > not get committed, then its very hard to figure out
> if
> > > this
> > > > > was
> > > > > > > > just
> > > > > > > > > a
> > > > > > > > > > > > false error. Since there is concept of unique ids for
> > > > > > messages, a
> > > > > > > > > > replay
> > > > > > > > > > > of
> > > > > > > > > > > > the same message will result in duplication. I think
> > its
> > > a
> > > > > > > > reasonable
> > > > > > > > > > > > behaviour considering kafka prefers to append data to
> > > > > > partitions
> > > > > > > > fot
> > > > > > > > > > > > performance reasons.
> > > > > > > > > > > > The best way to right now deal with duplicate msgs is
> > to
> > > > > build
> > > > > > > the
> > > > > > > > > > > > processing engine (layer where your consumer sits) to
> > > deal
> > > > > with
> > > > > > > at
> > > > > > > > > > least
> > > > > > > > > > > > once semantics of the broker.
> > > > > > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <
> > kane.isturm@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Or, to rephrase it more generally, is there a way
> to
> > > know
> > > > > > > exactly
> > > > > > > > > if
> > > > > > > > > > > > > message was committed or no?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > > > > > > kane.isturm@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hello Guozhang,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > My partitions are split almost evenly between
> > broker,
> > > > so,
> > > > > > > yes -
> > > > > > > > > > > broker
> > > > > > > > > > > > > > that I shutdown is the leader for some of them.
> > Does
> > > it
> > > > > > mean
> > > > > > > i
> > > > > > > > > can
> > > > > > > > > > > get
> > > > > > > > > > > > an
> > > > > > > > > > > > > > exception and data is still being written? Is
> there
> > > any
> > > > > > > setting
> > > > > > > > > on
> > > > > > > > > > > the
> > > > > > > > > > > > > > broker where i can control this? I.e. can i make
> > > broker
> > > > > > > > > replication
> > > > > > > > > > > > > timeout
> > > > > > > > > > > > > > shorter than producer timeout, so i can ensure
> if i
> > > get
> > > > > an
> > > > > > > > > > exception
> > > > > > > > > > > > data
> > > > > > > > > > > > > > is not being committed?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > > > > > > > wangguoz@gmail.com
> > > > > > > > > > > > > >wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> Hello Kane,
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> As discussed in the other thread, even if a
> > timeout
> > > > > > response
> > > > > > > > is
> > > > > > > > > > sent
> > > > > > > > > > > > > back
> > > > > > > > > > > > > >> to the producer, the message may still be
> > committed.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Did you shut down the leader broker of the
> > partition
> > > > or
> > > > > a
> > > > > > > > > follower
> > > > > > > > > > > > > broker?
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Guozhang
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > > > > > > kane.isturm@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > I have cluster of 3 kafka brokers. With the
> > > > following
> > > > > > > > script I
> > > > > > > > > > > send
> > > > > > > > > > > > > some
> > > > > > > > > > > > > >> > data to kafka and in the middle do the
> > controlled
> > > > > > shutdown
> > > > > > > > of
> > > > > > > > > 1
> > > > > > > > > > > > > broker.
> > > > > > > > > > > > > >> All
> > > > > > > > > > > > > >> > 3 brokers are ISR before I start sending.
> When i
> > > > > > shutdown
> > > > > > > > the
> > > > > > > > > > > > broker i
> > > > > > > > > > > > > >> get
> > > > > > > > > > > > > >> > a couple of exceptions and I expect data
> > shouldn't
> > > > be
> > > > > > > > written.
> > > > > > > > > > > Say,
> > > > > > > > > > > > I
> > > > > > > > > > > > > >> send
> > > > > > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to
> > > > consume
> > > > > > 1450
> > > > > > > > > > lines,
> > > > > > > > > > > > but
> > > > > > > > > > > > > >> > instead i always consume more, i.e. 1480 or
> > 1490.
> > > I
> > > > > want
> > > > > > > to
> > > > > > > > > > decide
> > > > > > > > > > > > if
> > > > > > > > > > > > > I
> > > > > > > > > > > > > >> > want to retry sending myself, not using
> > > > > > > > > > message.send.max.retries.
> > > > > > > > > > > > But
> > > > > > > > > > > > > >> looks
> > > > > > > > > > > > > >> > like if I retry sending if there is an
> exception
> > > - I
> > > > > > will
> > > > > > > > end
> > > > > > > > > up
> > > > > > > > > > > > with
> > > > > > > > > > > > > >> > duplicates. Is there anything I'm doing wrong
> or
> > > > > having
> > > > > > > > wrong
> > > > > > > > > > > > > >> assumptions
> > > > > > > > > > > > > >> > about kafka?
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Thanks.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > > > > > > > 10.80.42.154:9092,
> > > > > > > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > > > > > > >> > var count = 0
> > > > > > > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > > > > > > > >> >     try {
> > > > > > > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > > > > > > >> >       count += 1
> > > > > > > > > > > > > >> >       println("sent %s", count)
> > > > > > > > > > > > > >> >     } catch {
> > > > > > > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > > > > > > >> >     }
> > > > > > > > > > > > > >> > }
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > > > > > > >> >   val sync = true
> > > > > > > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >   val props = new Properties()
> > > > > > > > > > > > > >> >   props.put("metadata.broker.list",
> brokerList)
> > > > > > > > > > > > > >> >   props.put("producer.type", if(sync) "sync"
> > else
> > > > > > "async")
> > > > > > > > > > > > > >> >   props.put("request.required.acks",
> > > > > > requestRequiredAcks)
> > > > > > > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > > >> >   props.put("serializer.class",
> > > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >   val producer = new Producer[AnyRef,
> > AnyRef](new
> > > > > > > > > > > > > ProducerConfig(props))
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >   def send(topic: String, messages:
> > List[String])
> > > =
> > > > {
> > > > > > > > > > > > > >> >     val requests = new
> > > > > ArrayBuffer[KeyedMessage[AnyRef,
> > > > > > > > > AnyRef]]
> > > > > > > > > > > > > >> >     for (message <- messages) {
> > > > > > > > > > > > > >> >       requests += new KeyedMessage(topic,
> null,
> > > > > message,
> > > > > > > > > > message)
> > > > > > > > > > > > > >> >     }
> > > > > > > > > > > > > >> >     producer.send(requests)
> > > > > > > > > > > > > >> >   }
> > > > > > > > > > > > > >> > }
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> --
> > > > > > > > > > > > > >> -- Guozhang
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > -- Guozhang
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: producer exceptions when broker dies

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Jason,

You are right. I think we just have different definitions about "at least
once". What you have described to me is more related to "availability",
which says that your message will not be lost when there are failures. And
we achieve this through replication (which is related to
request.required.acks). In 0.7, we do not have any replications and hence
when a broker goes down all the partitions it holds will be non-available,
but we still defined Kafka as a "at least once" messaging system.

Guozhang


On Sat, Oct 26, 2013 at 9:32 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> Guozhang,
>
> It turns out this is not entirely true, you do need request.required.acks =
> -1 (and yes, you need to retry if failure) in order have guaranteed
> delivery.
>
> I discovered this, when doing tests with rolling restarts (and hard
> restarts) of the kafka servers.  If the server goes down, e.g. if there's
> any change in the ISR leader for a partition, then the only way you can be
> sure what you produced will be available on a newly elected leader is to
> use -1.
>
> Jason
>
>
>
>
> On Sat, Oct 26, 2013 at 12:11 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Jason,
> >
> > Setting request.required.acks=-1 is orthogonal to the 'at least once'
> > guarantee, it only relates to the latency/replication trade-off. For
> > example, even if you set request.required.acks to 1, and as long as you
> > retry on all non-fatal exceptions you have the "at least once" guarantee;
> > and even if you set request.required.acks to -1 and you do not retry, you
> > will not get "at least once".
> >
> > As you said, setting request.required.acks=-1 only means that when a
> > success response has been returned you know that the message has been
> > received by all ISR replicas, but this has nothing to do with "at least
> > once" guarantee.
> >
> > Guozhang
> >
> >
> > On Fri, Oct 25, 2013 at 8:55 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > Just to clarify, I think in order to get 'at least once' guarantees,
> you
> > > must produce messages with 'request.required.acks=-1'.  Otherwise, you
> > > can't be 100% sure the message was received by all ISR replicas.
> > >
> > >
> > > On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <ka...@gmail.com>
> > wrote:
> > >
> > > > Thanks Guozhang, it makes sense if it's by design. Just wanted to
> > ensure
> > > > i'm not doing something wrong.
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > As we have said, the timeout exception does not actually mean the
> > > message
> > > > > is not committed to the broker. When message.send.max.retries is 0
> > > Kafka
> > > > > does guarantee "at-most-once" which means that you will not have
> > > > > duplicates, but not means that all your exceptions can be treated
> as
> > > > > "message not delivered". In your case, 1480 - 1450 = 30 messages
> are
> > > the
> > > > > ones that are actually sent, not the ones that are duplicates.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <ka...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > There are a lot of exceptions, I will try to pick an example of
> > each:
> > > > > > ERROR async.DefaultEventHandler - Failed to send requests for
> > topics
> > > > > > benchmark with correlation ids in [879,881]
> > > > > > WARN  async.DefaultEventHandler - Produce request with
> correlation
> > id
> > > > 874
> > > > > > failed due to [benchmark,43]:
> kafka.common.RequestTimedOutException
> > > > > > WARN  client.ClientUtils$ - Fetching topic metadata with
> > correlation
> > > id
> > > > > 876
> > > > > > for topics [Set(benchmark)] from broker
> > > > > [id:2,host:10.80.42.156,port:9092]
> > > > > > failed
> > > > > > ERROR producer.SyncProducer - Producer connection to
> > > > > > 10.80.42.156:9092unsuccessful
> > > > > > kafka.common.FailedToSendMessageException: Failed to send
> messages
> > > > after
> > > > > 0
> > > > > > tries.
> > > > > > WARN  async.DefaultEventHandler - Failed to send producer request
> > > with
> > > > > > correlation id 270 to broker 0 with data for partitions
> > > [benchmark,42]
> > > > > >
> > > > > > I think these are all types of exceptions i see there.
> > > > > > Thanks.
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Kane,
> > > > > > >
> > > > > > > If you set message.send.max.retries to 0 it should be
> > at-most-once,
> > > > > and I
> > > > > > > saw your props have the right config. What are the exceptions
> you
> > > got
> > > > > > from
> > > > > > > the send() call?
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <
> > > steve@stevemorin.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Kane and Aniket,
> > > > > > > >   I am interested in knowing what the pattern/solution that
> > > people
> > > > > > > usually
> > > > > > > > use to implement exactly once as well.
> > > > > > > > -Steve
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <
> > > kane.isturm@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Guozhang, but i've posted a piece from kafka documentation
> > > above:
> > > > > > > > > So effectively Kafka guarantees at-least-once delivery by
> > > default
> > > > > and
> > > > > > > > > allows the user to implement at most once delivery by
> > disabling
> > > > > > retries
> > > > > > > > on
> > > > > > > > > the producer.
> > > > > > > > >
> > > > > > > > > What i want is at-most-once and docs claim it's possible
> with
> > > > > certain
> > > > > > > > > settings. Did i miss anything here?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> > > > > wangguoz@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Aniket is exactly right. In general, Kafka provides "at
> > least
> > > > > once"
> > > > > > > > > > guarantee instead of "exactly once".
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > > > > > aniket.bhatnagar@gmail.com> wrote:
> > > > > > > > > >
> > > > > > > > > > > As per my understanding, if the broker says the msg is
> > > > > committed,
> > > > > > > >  its
> > > > > > > > > > > guaranteed to have been committed as per ur ack config.
> > If
> > > it
> > > > > > says
> > > > > > > it
> > > > > > > > > did
> > > > > > > > > > > not get committed, then its very hard to figure out if
> > this
> > > > was
> > > > > > > just
> > > > > > > > a
> > > > > > > > > > > false error. Since there is concept of unique ids for
> > > > > messages, a
> > > > > > > > > replay
> > > > > > > > > > of
> > > > > > > > > > > the same message will result in duplication. I think
> its
> > a
> > > > > > > reasonable
> > > > > > > > > > > behaviour considering kafka prefers to append data to
> > > > > partitions
> > > > > > > fot
> > > > > > > > > > > performance reasons.
> > > > > > > > > > > The best way to right now deal with duplicate msgs is
> to
> > > > build
> > > > > > the
> > > > > > > > > > > processing engine (layer where your consumer sits) to
> > deal
> > > > with
> > > > > > at
> > > > > > > > > least
> > > > > > > > > > > once semantics of the broker.
> > > > > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <
> kane.isturm@gmail.com
> > >
> > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Or, to rephrase it more generally, is there a way to
> > know
> > > > > > exactly
> > > > > > > > if
> > > > > > > > > > > > message was committed or no?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > > > > > kane.isturm@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hello Guozhang,
> > > > > > > > > > > > >
> > > > > > > > > > > > > My partitions are split almost evenly between
> broker,
> > > so,
> > > > > > yes -
> > > > > > > > > > broker
> > > > > > > > > > > > > that I shutdown is the leader for some of them.
> Does
> > it
> > > > > mean
> > > > > > i
> > > > > > > > can
> > > > > > > > > > get
> > > > > > > > > > > an
> > > > > > > > > > > > > exception and data is still being written? Is there
> > any
> > > > > > setting
> > > > > > > > on
> > > > > > > > > > the
> > > > > > > > > > > > > broker where i can control this? I.e. can i make
> > broker
> > > > > > > > replication
> > > > > > > > > > > > timeout
> > > > > > > > > > > > > shorter than producer timeout, so i can ensure if i
> > get
> > > > an
> > > > > > > > > exception
> > > > > > > > > > > data
> > > > > > > > > > > > > is not being committed?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > > > > > > wangguoz@gmail.com
> > > > > > > > > > > > >wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Hello Kane,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> As discussed in the other thread, even if a
> timeout
> > > > > response
> > > > > > > is
> > > > > > > > > sent
> > > > > > > > > > > > back
> > > > > > > > > > > > >> to the producer, the message may still be
> committed.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Did you shut down the leader broker of the
> partition
> > > or
> > > > a
> > > > > > > > follower
> > > > > > > > > > > > broker?
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Guozhang
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > > > > > kane.isturm@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> > I have cluster of 3 kafka brokers. With the
> > > following
> > > > > > > script I
> > > > > > > > > > send
> > > > > > > > > > > > some
> > > > > > > > > > > > >> > data to kafka and in the middle do the
> controlled
> > > > > shutdown
> > > > > > > of
> > > > > > > > 1
> > > > > > > > > > > > broker.
> > > > > > > > > > > > >> All
> > > > > > > > > > > > >> > 3 brokers are ISR before I start sending. When i
> > > > > shutdown
> > > > > > > the
> > > > > > > > > > > broker i
> > > > > > > > > > > > >> get
> > > > > > > > > > > > >> > a couple of exceptions and I expect data
> shouldn't
> > > be
> > > > > > > written.
> > > > > > > > > > Say,
> > > > > > > > > > > I
> > > > > > > > > > > > >> send
> > > > > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to
> > > consume
> > > > > 1450
> > > > > > > > > lines,
> > > > > > > > > > > but
> > > > > > > > > > > > >> > instead i always consume more, i.e. 1480 or
> 1490.
> > I
> > > > want
> > > > > > to
> > > > > > > > > decide
> > > > > > > > > > > if
> > > > > > > > > > > > I
> > > > > > > > > > > > >> > want to retry sending myself, not using
> > > > > > > > > message.send.max.retries.
> > > > > > > > > > > But
> > > > > > > > > > > > >> looks
> > > > > > > > > > > > >> > like if I retry sending if there is an exception
> > - I
> > > > > will
> > > > > > > end
> > > > > > > > up
> > > > > > > > > > > with
> > > > > > > > > > > > >> > duplicates. Is there anything I'm doing wrong or
> > > > having
> > > > > > > wrong
> > > > > > > > > > > > >> assumptions
> > > > > > > > > > > > >> > about kafka?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Thanks.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > > > > > > 10.80.42.154:9092,
> > > > > > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > > > > > >> > var count = 0
> > > > > > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > > > > > > >> >     try {
> > > > > > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > > > > > >> >       count += 1
> > > > > > > > > > > > >> >       println("sent %s", count)
> > > > > > > > > > > > >> >     } catch {
> > > > > > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > > > > > >> >     }
> > > > > > > > > > > > >> > }
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > > > > > >> >   val sync = true
> > > > > > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >   val props = new Properties()
> > > > > > > > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > > > > > > > >> >   props.put("producer.type", if(sync) "sync"
> else
> > > > > "async")
> > > > > > > > > > > > >> >   props.put("request.required.acks",
> > > > > requestRequiredAcks)
> > > > > > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > >> >   props.put("serializer.class",
> > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >   val producer = new Producer[AnyRef,
> AnyRef](new
> > > > > > > > > > > > ProducerConfig(props))
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >   def send(topic: String, messages:
> List[String])
> > =
> > > {
> > > > > > > > > > > > >> >     val requests = new
> > > > ArrayBuffer[KeyedMessage[AnyRef,
> > > > > > > > AnyRef]]
> > > > > > > > > > > > >> >     for (message <- messages) {
> > > > > > > > > > > > >> >       requests += new KeyedMessage(topic, null,
> > > > message,
> > > > > > > > > message)
> > > > > > > > > > > > >> >     }
> > > > > > > > > > > > >> >     producer.send(requests)
> > > > > > > > > > > > >> >   }
> > > > > > > > > > > > >> > }
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> --
> > > > > > > > > > > > >> -- Guozhang
> > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: producer exceptions when broker dies

Posted by Jason Rosenberg <jb...@squareup.com>.
Guozhang,

It turns out this is not entirely true, you do need request.required.acks =
-1 (and yes, you need to retry if failure) in order have guaranteed
delivery.

I discovered this, when doing tests with rolling restarts (and hard
restarts) of the kafka servers.  If the server goes down, e.g. if there's
any change in the ISR leader for a partition, then the only way you can be
sure what you produced will be available on a newly elected leader is to
use -1.

Jason




On Sat, Oct 26, 2013 at 12:11 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Jason,
>
> Setting request.required.acks=-1 is orthogonal to the 'at least once'
> guarantee, it only relates to the latency/replication trade-off. For
> example, even if you set request.required.acks to 1, and as long as you
> retry on all non-fatal exceptions you have the "at least once" guarantee;
> and even if you set request.required.acks to -1 and you do not retry, you
> will not get "at least once".
>
> As you said, setting request.required.acks=-1 only means that when a
> success response has been returned you know that the message has been
> received by all ISR replicas, but this has nothing to do with "at least
> once" guarantee.
>
> Guozhang
>
>
> On Fri, Oct 25, 2013 at 8:55 PM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > Just to clarify, I think in order to get 'at least once' guarantees, you
> > must produce messages with 'request.required.acks=-1'.  Otherwise, you
> > can't be 100% sure the message was received by all ISR replicas.
> >
> >
> > On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <ka...@gmail.com>
> wrote:
> >
> > > Thanks Guozhang, it makes sense if it's by design. Just wanted to
> ensure
> > > i'm not doing something wrong.
> > >
> > >
> > > On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > As we have said, the timeout exception does not actually mean the
> > message
> > > > is not committed to the broker. When message.send.max.retries is 0
> > Kafka
> > > > does guarantee "at-most-once" which means that you will not have
> > > > duplicates, but not means that all your exceptions can be treated as
> > > > "message not delivered". In your case, 1480 - 1450 = 30 messages are
> > the
> > > > ones that are actually sent, not the ones that are duplicates.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <ka...@gmail.com>
> > > wrote:
> > > >
> > > > > There are a lot of exceptions, I will try to pick an example of
> each:
> > > > > ERROR async.DefaultEventHandler - Failed to send requests for
> topics
> > > > > benchmark with correlation ids in [879,881]
> > > > > WARN  async.DefaultEventHandler - Produce request with correlation
> id
> > > 874
> > > > > failed due to [benchmark,43]: kafka.common.RequestTimedOutException
> > > > > WARN  client.ClientUtils$ - Fetching topic metadata with
> correlation
> > id
> > > > 876
> > > > > for topics [Set(benchmark)] from broker
> > > > [id:2,host:10.80.42.156,port:9092]
> > > > > failed
> > > > > ERROR producer.SyncProducer - Producer connection to
> > > > > 10.80.42.156:9092unsuccessful
> > > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > > after
> > > > 0
> > > > > tries.
> > > > > WARN  async.DefaultEventHandler - Failed to send producer request
> > with
> > > > > correlation id 270 to broker 0 with data for partitions
> > [benchmark,42]
> > > > >
> > > > > I think these are all types of exceptions i see there.
> > > > > Thanks.
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Kane,
> > > > > >
> > > > > > If you set message.send.max.retries to 0 it should be
> at-most-once,
> > > > and I
> > > > > > saw your props have the right config. What are the exceptions you
> > got
> > > > > from
> > > > > > the send() call?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <
> > steve@stevemorin.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Kane and Aniket,
> > > > > > >   I am interested in knowing what the pattern/solution that
> > people
> > > > > > usually
> > > > > > > use to implement exactly once as well.
> > > > > > > -Steve
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <
> > kane.isturm@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Guozhang, but i've posted a piece from kafka documentation
> > above:
> > > > > > > > So effectively Kafka guarantees at-least-once delivery by
> > default
> > > > and
> > > > > > > > allows the user to implement at most once delivery by
> disabling
> > > > > retries
> > > > > > > on
> > > > > > > > the producer.
> > > > > > > >
> > > > > > > > What i want is at-most-once and docs claim it's possible with
> > > > certain
> > > > > > > > settings. Did i miss anything here?
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> > > > wangguoz@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Aniket is exactly right. In general, Kafka provides "at
> least
> > > > once"
> > > > > > > > > guarantee instead of "exactly once".
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > > > > aniket.bhatnagar@gmail.com> wrote:
> > > > > > > > >
> > > > > > > > > > As per my understanding, if the broker says the msg is
> > > > committed,
> > > > > > >  its
> > > > > > > > > > guaranteed to have been committed as per ur ack config.
> If
> > it
> > > > > says
> > > > > > it
> > > > > > > > did
> > > > > > > > > > not get committed, then its very hard to figure out if
> this
> > > was
> > > > > > just
> > > > > > > a
> > > > > > > > > > false error. Since there is concept of unique ids for
> > > > messages, a
> > > > > > > > replay
> > > > > > > > > of
> > > > > > > > > > the same message will result in duplication. I think its
> a
> > > > > > reasonable
> > > > > > > > > > behaviour considering kafka prefers to append data to
> > > > partitions
> > > > > > fot
> > > > > > > > > > performance reasons.
> > > > > > > > > > The best way to right now deal with duplicate msgs is to
> > > build
> > > > > the
> > > > > > > > > > processing engine (layer where your consumer sits) to
> deal
> > > with
> > > > > at
> > > > > > > > least
> > > > > > > > > > once semantics of the broker.
> > > > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <kane.isturm@gmail.com
> >
> > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Or, to rephrase it more generally, is there a way to
> know
> > > > > exactly
> > > > > > > if
> > > > > > > > > > > message was committed or no?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > > > > kane.isturm@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hello Guozhang,
> > > > > > > > > > > >
> > > > > > > > > > > > My partitions are split almost evenly between broker,
> > so,
> > > > > yes -
> > > > > > > > > broker
> > > > > > > > > > > > that I shutdown is the leader for some of them. Does
> it
> > > > mean
> > > > > i
> > > > > > > can
> > > > > > > > > get
> > > > > > > > > > an
> > > > > > > > > > > > exception and data is still being written? Is there
> any
> > > > > setting
> > > > > > > on
> > > > > > > > > the
> > > > > > > > > > > > broker where i can control this? I.e. can i make
> broker
> > > > > > > replication
> > > > > > > > > > > timeout
> > > > > > > > > > > > shorter than producer timeout, so i can ensure if i
> get
> > > an
> > > > > > > > exception
> > > > > > > > > > data
> > > > > > > > > > > > is not being committed?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > > > > > wangguoz@gmail.com
> > > > > > > > > > > >wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Hello Kane,
> > > > > > > > > > > >>
> > > > > > > > > > > >> As discussed in the other thread, even if a timeout
> > > > response
> > > > > > is
> > > > > > > > sent
> > > > > > > > > > > back
> > > > > > > > > > > >> to the producer, the message may still be committed.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Did you shut down the leader broker of the partition
> > or
> > > a
> > > > > > > follower
> > > > > > > > > > > broker?
> > > > > > > > > > > >>
> > > > > > > > > > > >> Guozhang
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > > > > kane.isturm@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > I have cluster of 3 kafka brokers. With the
> > following
> > > > > > script I
> > > > > > > > > send
> > > > > > > > > > > some
> > > > > > > > > > > >> > data to kafka and in the middle do the controlled
> > > > shutdown
> > > > > > of
> > > > > > > 1
> > > > > > > > > > > broker.
> > > > > > > > > > > >> All
> > > > > > > > > > > >> > 3 brokers are ISR before I start sending. When i
> > > > shutdown
> > > > > > the
> > > > > > > > > > broker i
> > > > > > > > > > > >> get
> > > > > > > > > > > >> > a couple of exceptions and I expect data shouldn't
> > be
> > > > > > written.
> > > > > > > > > Say,
> > > > > > > > > > I
> > > > > > > > > > > >> send
> > > > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to
> > consume
> > > > 1450
> > > > > > > > lines,
> > > > > > > > > > but
> > > > > > > > > > > >> > instead i always consume more, i.e. 1480 or 1490.
> I
> > > want
> > > > > to
> > > > > > > > decide
> > > > > > > > > > if
> > > > > > > > > > > I
> > > > > > > > > > > >> > want to retry sending myself, not using
> > > > > > > > message.send.max.retries.
> > > > > > > > > > But
> > > > > > > > > > > >> looks
> > > > > > > > > > > >> > like if I retry sending if there is an exception
> - I
> > > > will
> > > > > > end
> > > > > > > up
> > > > > > > > > > with
> > > > > > > > > > > >> > duplicates. Is there anything I'm doing wrong or
> > > having
> > > > > > wrong
> > > > > > > > > > > >> assumptions
> > > > > > > > > > > >> > about kafka?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > > > > > 10.80.42.154:9092,
> > > > > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > > > > >> > var count = 0
> > > > > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > > > > > >> >     try {
> > > > > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > > > > >> >       count += 1
> > > > > > > > > > > >> >       println("sent %s", count)
> > > > > > > > > > > >> >     } catch {
> > > > > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > > > > >> >     }
> > > > > > > > > > > >> > }
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > > > > >> >   val sync = true
> > > > > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >   val props = new Properties()
> > > > > > > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > > > > > > >> >   props.put("producer.type", if(sync) "sync" else
> > > > "async")
> > > > > > > > > > > >> >   props.put("request.required.acks",
> > > > requestRequiredAcks)
> > > > > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > >> >   props.put("serializer.class",
> > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > > > > > > > > ProducerConfig(props))
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >   def send(topic: String, messages: List[String])
> =
> > {
> > > > > > > > > > > >> >     val requests = new
> > > ArrayBuffer[KeyedMessage[AnyRef,
> > > > > > > AnyRef]]
> > > > > > > > > > > >> >     for (message <- messages) {
> > > > > > > > > > > >> >       requests += new KeyedMessage(topic, null,
> > > message,
> > > > > > > > message)
> > > > > > > > > > > >> >     }
> > > > > > > > > > > >> >     producer.send(requests)
> > > > > > > > > > > >> >   }
> > > > > > > > > > > >> > }
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> --
> > > > > > > > > > > >> -- Guozhang
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: producer exceptions when broker dies

Posted by Guozhang Wang <wa...@gmail.com>.
Jason,

Setting request.required.acks=-1 is orthogonal to the 'at least once'
guarantee, it only relates to the latency/replication trade-off. For
example, even if you set request.required.acks to 1, and as long as you
retry on all non-fatal exceptions you have the "at least once" guarantee;
and even if you set request.required.acks to -1 and you do not retry, you
will not get "at least once".

As you said, setting request.required.acks=-1 only means that when a
success response has been returned you know that the message has been
received by all ISR replicas, but this has nothing to do with "at least
once" guarantee.

Guozhang


On Fri, Oct 25, 2013 at 8:55 PM, Jason Rosenberg <jb...@squareup.com> wrote:

> Just to clarify, I think in order to get 'at least once' guarantees, you
> must produce messages with 'request.required.acks=-1'.  Otherwise, you
> can't be 100% sure the message was received by all ISR replicas.
>
>
> On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <ka...@gmail.com> wrote:
>
> > Thanks Guozhang, it makes sense if it's by design. Just wanted to ensure
> > i'm not doing something wrong.
> >
> >
> > On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > As we have said, the timeout exception does not actually mean the
> message
> > > is not committed to the broker. When message.send.max.retries is 0
> Kafka
> > > does guarantee "at-most-once" which means that you will not have
> > > duplicates, but not means that all your exceptions can be treated as
> > > "message not delivered". In your case, 1480 - 1450 = 30 messages are
> the
> > > ones that are actually sent, not the ones that are duplicates.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <ka...@gmail.com>
> > wrote:
> > >
> > > > There are a lot of exceptions, I will try to pick an example of each:
> > > > ERROR async.DefaultEventHandler - Failed to send requests for topics
> > > > benchmark with correlation ids in [879,881]
> > > > WARN  async.DefaultEventHandler - Produce request with correlation id
> > 874
> > > > failed due to [benchmark,43]: kafka.common.RequestTimedOutException
> > > > WARN  client.ClientUtils$ - Fetching topic metadata with correlation
> id
> > > 876
> > > > for topics [Set(benchmark)] from broker
> > > [id:2,host:10.80.42.156,port:9092]
> > > > failed
> > > > ERROR producer.SyncProducer - Producer connection to
> > > > 10.80.42.156:9092unsuccessful
> > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > after
> > > 0
> > > > tries.
> > > > WARN  async.DefaultEventHandler - Failed to send producer request
> with
> > > > correlation id 270 to broker 0 with data for partitions
> [benchmark,42]
> > > >
> > > > I think these are all types of exceptions i see there.
> > > > Thanks.
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Kane,
> > > > >
> > > > > If you set message.send.max.retries to 0 it should be at-most-once,
> > > and I
> > > > > saw your props have the right config. What are the exceptions you
> got
> > > > from
> > > > > the send() call?
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <
> steve@stevemorin.com>
> > > > > wrote:
> > > > >
> > > > > > Kane and Aniket,
> > > > > >   I am interested in knowing what the pattern/solution that
> people
> > > > > usually
> > > > > > use to implement exactly once as well.
> > > > > > -Steve
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <
> kane.isturm@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Guozhang, but i've posted a piece from kafka documentation
> above:
> > > > > > > So effectively Kafka guarantees at-least-once delivery by
> default
> > > and
> > > > > > > allows the user to implement at most once delivery by disabling
> > > > retries
> > > > > > on
> > > > > > > the producer.
> > > > > > >
> > > > > > > What i want is at-most-once and docs claim it's possible with
> > > certain
> > > > > > > settings. Did i miss anything here?
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> > > wangguoz@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Aniket is exactly right. In general, Kafka provides "at least
> > > once"
> > > > > > > > guarantee instead of "exactly once".
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > > > aniket.bhatnagar@gmail.com> wrote:
> > > > > > > >
> > > > > > > > > As per my understanding, if the broker says the msg is
> > > committed,
> > > > > >  its
> > > > > > > > > guaranteed to have been committed as per ur ack config. If
> it
> > > > says
> > > > > it
> > > > > > > did
> > > > > > > > > not get committed, then its very hard to figure out if this
> > was
> > > > > just
> > > > > > a
> > > > > > > > > false error. Since there is concept of unique ids for
> > > messages, a
> > > > > > > replay
> > > > > > > > of
> > > > > > > > > the same message will result in duplication. I think its a
> > > > > reasonable
> > > > > > > > > behaviour considering kafka prefers to append data to
> > > partitions
> > > > > fot
> > > > > > > > > performance reasons.
> > > > > > > > > The best way to right now deal with duplicate msgs is to
> > build
> > > > the
> > > > > > > > > processing engine (layer where your consumer sits) to deal
> > with
> > > > at
> > > > > > > least
> > > > > > > > > once semantics of the broker.
> > > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com>
> > > wrote:
> > > > > > > > >
> > > > > > > > > > Or, to rephrase it more generally, is there a way to know
> > > > exactly
> > > > > > if
> > > > > > > > > > message was committed or no?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > > > kane.isturm@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hello Guozhang,
> > > > > > > > > > >
> > > > > > > > > > > My partitions are split almost evenly between broker,
> so,
> > > > yes -
> > > > > > > > broker
> > > > > > > > > > > that I shutdown is the leader for some of them. Does it
> > > mean
> > > > i
> > > > > > can
> > > > > > > > get
> > > > > > > > > an
> > > > > > > > > > > exception and data is still being written? Is there any
> > > > setting
> > > > > > on
> > > > > > > > the
> > > > > > > > > > > broker where i can control this? I.e. can i make broker
> > > > > > replication
> > > > > > > > > > timeout
> > > > > > > > > > > shorter than producer timeout, so i can ensure if i get
> > an
> > > > > > > exception
> > > > > > > > > data
> > > > > > > > > > > is not being committed?
> > > > > > > > > > >
> > > > > > > > > > > Thanks.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > > > > wangguoz@gmail.com
> > > > > > > > > > >wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hello Kane,
> > > > > > > > > > >>
> > > > > > > > > > >> As discussed in the other thread, even if a timeout
> > > response
> > > > > is
> > > > > > > sent
> > > > > > > > > > back
> > > > > > > > > > >> to the producer, the message may still be committed.
> > > > > > > > > > >>
> > > > > > > > > > >> Did you shut down the leader broker of the partition
> or
> > a
> > > > > > follower
> > > > > > > > > > broker?
> > > > > > > > > > >>
> > > > > > > > > > >> Guozhang
> > > > > > > > > > >>
> > > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > > > kane.isturm@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >>
> > > > > > > > > > >> > I have cluster of 3 kafka brokers. With the
> following
> > > > > script I
> > > > > > > > send
> > > > > > > > > > some
> > > > > > > > > > >> > data to kafka and in the middle do the controlled
> > > shutdown
> > > > > of
> > > > > > 1
> > > > > > > > > > broker.
> > > > > > > > > > >> All
> > > > > > > > > > >> > 3 brokers are ISR before I start sending. When i
> > > shutdown
> > > > > the
> > > > > > > > > broker i
> > > > > > > > > > >> get
> > > > > > > > > > >> > a couple of exceptions and I expect data shouldn't
> be
> > > > > written.
> > > > > > > > Say,
> > > > > > > > > I
> > > > > > > > > > >> send
> > > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to
> consume
> > > 1450
> > > > > > > lines,
> > > > > > > > > but
> > > > > > > > > > >> > instead i always consume more, i.e. 1480 or 1490. I
> > want
> > > > to
> > > > > > > decide
> > > > > > > > > if
> > > > > > > > > > I
> > > > > > > > > > >> > want to retry sending myself, not using
> > > > > > > message.send.max.retries.
> > > > > > > > > But
> > > > > > > > > > >> looks
> > > > > > > > > > >> > like if I retry sending if there is an exception - I
> > > will
> > > > > end
> > > > > > up
> > > > > > > > > with
> > > > > > > > > > >> > duplicates. Is there anything I'm doing wrong or
> > having
> > > > > wrong
> > > > > > > > > > >> assumptions
> > > > > > > > > > >> > about kafka?
> > > > > > > > > > >> >
> > > > > > > > > > >> > Thanks.
> > > > > > > > > > >> >
> > > > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > > > > 10.80.42.154:9092,
> > > > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > > > >> > var count = 0
> > > > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > > > > >> >     try {
> > > > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > > > >> >       count += 1
> > > > > > > > > > >> >       println("sent %s", count)
> > > > > > > > > > >> >     } catch {
> > > > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > > > >> >     }
> > > > > > > > > > >> > }
> > > > > > > > > > >> >
> > > > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > > > >> >   val sync = true
> > > > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > > > >> >
> > > > > > > > > > >> >   val props = new Properties()
> > > > > > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > > > > > >> >   props.put("producer.type", if(sync) "sync" else
> > > "async")
> > > > > > > > > > >> >   props.put("request.required.acks",
> > > requestRequiredAcks)
> > > > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > >> >   props.put("serializer.class",
> > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > > > >> >
> > > > > > > > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > > > > > > > ProducerConfig(props))
> > > > > > > > > > >> >
> > > > > > > > > > >> >   def send(topic: String, messages: List[String]) =
> {
> > > > > > > > > > >> >     val requests = new
> > ArrayBuffer[KeyedMessage[AnyRef,
> > > > > > AnyRef]]
> > > > > > > > > > >> >     for (message <- messages) {
> > > > > > > > > > >> >       requests += new KeyedMessage(topic, null,
> > message,
> > > > > > > message)
> > > > > > > > > > >> >     }
> > > > > > > > > > >> >     producer.send(requests)
> > > > > > > > > > >> >   }
> > > > > > > > > > >> > }
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> --
> > > > > > > > > > >> -- Guozhang
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang

Re: producer exceptions when broker dies

Posted by Jason Rosenberg <jb...@squareup.com>.
Just to clarify, I think in order to get 'at least once' guarantees, you
must produce messages with 'request.required.acks=-1'.  Otherwise, you
can't be 100% sure the message was received by all ISR replicas.


On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <ka...@gmail.com> wrote:

> Thanks Guozhang, it makes sense if it's by design. Just wanted to ensure
> i'm not doing something wrong.
>
>
> On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > As we have said, the timeout exception does not actually mean the message
> > is not committed to the broker. When message.send.max.retries is 0 Kafka
> > does guarantee "at-most-once" which means that you will not have
> > duplicates, but not means that all your exceptions can be treated as
> > "message not delivered". In your case, 1480 - 1450 = 30 messages are the
> > ones that are actually sent, not the ones that are duplicates.
> >
> > Guozhang
> >
> >
> > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <ka...@gmail.com>
> wrote:
> >
> > > There are a lot of exceptions, I will try to pick an example of each:
> > > ERROR async.DefaultEventHandler - Failed to send requests for topics
> > > benchmark with correlation ids in [879,881]
> > > WARN  async.DefaultEventHandler - Produce request with correlation id
> 874
> > > failed due to [benchmark,43]: kafka.common.RequestTimedOutException
> > > WARN  client.ClientUtils$ - Fetching topic metadata with correlation id
> > 876
> > > for topics [Set(benchmark)] from broker
> > [id:2,host:10.80.42.156,port:9092]
> > > failed
> > > ERROR producer.SyncProducer - Producer connection to
> > > 10.80.42.156:9092unsuccessful
> > > kafka.common.FailedToSendMessageException: Failed to send messages
> after
> > 0
> > > tries.
> > > WARN  async.DefaultEventHandler - Failed to send producer request with
> > > correlation id 270 to broker 0 with data for partitions [benchmark,42]
> > >
> > > I think these are all types of exceptions i see there.
> > > Thanks.
> > >
> > >
> > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Kane,
> > > >
> > > > If you set message.send.max.retries to 0 it should be at-most-once,
> > and I
> > > > saw your props have the right config. What are the exceptions you got
> > > from
> > > > the send() call?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <st...@stevemorin.com>
> > > > wrote:
> > > >
> > > > > Kane and Aniket,
> > > > >   I am interested in knowing what the pattern/solution that people
> > > > usually
> > > > > use to implement exactly once as well.
> > > > > -Steve
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <kane.isturm@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Guozhang, but i've posted a piece from kafka documentation above:
> > > > > > So effectively Kafka guarantees at-least-once delivery by default
> > and
> > > > > > allows the user to implement at most once delivery by disabling
> > > retries
> > > > > on
> > > > > > the producer.
> > > > > >
> > > > > > What i want is at-most-once and docs claim it's possible with
> > certain
> > > > > > settings. Did i miss anything here?
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> > wangguoz@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Aniket is exactly right. In general, Kafka provides "at least
> > once"
> > > > > > > guarantee instead of "exactly once".
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > > aniket.bhatnagar@gmail.com> wrote:
> > > > > > >
> > > > > > > > As per my understanding, if the broker says the msg is
> > committed,
> > > > >  its
> > > > > > > > guaranteed to have been committed as per ur ack config. If it
> > > says
> > > > it
> > > > > > did
> > > > > > > > not get committed, then its very hard to figure out if this
> was
> > > > just
> > > > > a
> > > > > > > > false error. Since there is concept of unique ids for
> > messages, a
> > > > > > replay
> > > > > > > of
> > > > > > > > the same message will result in duplication. I think its a
> > > > reasonable
> > > > > > > > behaviour considering kafka prefers to append data to
> > partitions
> > > > fot
> > > > > > > > performance reasons.
> > > > > > > > The best way to right now deal with duplicate msgs is to
> build
> > > the
> > > > > > > > processing engine (layer where your consumer sits) to deal
> with
> > > at
> > > > > > least
> > > > > > > > once semantics of the broker.
> > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com>
> > wrote:
> > > > > > > >
> > > > > > > > > Or, to rephrase it more generally, is there a way to know
> > > exactly
> > > > > if
> > > > > > > > > message was committed or no?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > > kane.isturm@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello Guozhang,
> > > > > > > > > >
> > > > > > > > > > My partitions are split almost evenly between broker, so,
> > > yes -
> > > > > > > broker
> > > > > > > > > > that I shutdown is the leader for some of them. Does it
> > mean
> > > i
> > > > > can
> > > > > > > get
> > > > > > > > an
> > > > > > > > > > exception and data is still being written? Is there any
> > > setting
> > > > > on
> > > > > > > the
> > > > > > > > > > broker where i can control this? I.e. can i make broker
> > > > > replication
> > > > > > > > > timeout
> > > > > > > > > > shorter than producer timeout, so i can ensure if i get
> an
> > > > > > exception
> > > > > > > > data
> > > > > > > > > > is not being committed?
> > > > > > > > > >
> > > > > > > > > > Thanks.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > > > wangguoz@gmail.com
> > > > > > > > > >wrote:
> > > > > > > > > >
> > > > > > > > > >> Hello Kane,
> > > > > > > > > >>
> > > > > > > > > >> As discussed in the other thread, even if a timeout
> > response
> > > > is
> > > > > > sent
> > > > > > > > > back
> > > > > > > > > >> to the producer, the message may still be committed.
> > > > > > > > > >>
> > > > > > > > > >> Did you shut down the leader broker of the partition or
> a
> > > > > follower
> > > > > > > > > broker?
> > > > > > > > > >>
> > > > > > > > > >> Guozhang
> > > > > > > > > >>
> > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > > kane.isturm@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >>
> > > > > > > > > >> > I have cluster of 3 kafka brokers. With the following
> > > > script I
> > > > > > > send
> > > > > > > > > some
> > > > > > > > > >> > data to kafka and in the middle do the controlled
> > shutdown
> > > > of
> > > > > 1
> > > > > > > > > broker.
> > > > > > > > > >> All
> > > > > > > > > >> > 3 brokers are ISR before I start sending. When i
> > shutdown
> > > > the
> > > > > > > > broker i
> > > > > > > > > >> get
> > > > > > > > > >> > a couple of exceptions and I expect data shouldn't be
> > > > written.
> > > > > > > Say,
> > > > > > > > I
> > > > > > > > > >> send
> > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to consume
> > 1450
> > > > > > lines,
> > > > > > > > but
> > > > > > > > > >> > instead i always consume more, i.e. 1480 or 1490. I
> want
> > > to
> > > > > > decide
> > > > > > > > if
> > > > > > > > > I
> > > > > > > > > >> > want to retry sending myself, not using
> > > > > > message.send.max.retries.
> > > > > > > > But
> > > > > > > > > >> looks
> > > > > > > > > >> > like if I retry sending if there is an exception - I
> > will
> > > > end
> > > > > up
> > > > > > > > with
> > > > > > > > > >> > duplicates. Is there anything I'm doing wrong or
> having
> > > > wrong
> > > > > > > > > >> assumptions
> > > > > > > > > >> > about kafka?
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks.
> > > > > > > > > >> >
> > > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > > > 10.80.42.154:9092,
> > > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > > >> > var count = 0
> > > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > > > >> >     try {
> > > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > > >> >       count += 1
> > > > > > > > > >> >       println("sent %s", count)
> > > > > > > > > >> >     } catch {
> > > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > > >> >     }
> > > > > > > > > >> > }
> > > > > > > > > >> >
> > > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > > >> >   val sync = true
> > > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > > >> >
> > > > > > > > > >> >   val props = new Properties()
> > > > > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > > > > >> >   props.put("producer.type", if(sync) "sync" else
> > "async")
> > > > > > > > > >> >   props.put("request.required.acks",
> > requestRequiredAcks)
> > > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > >> >   props.put("serializer.class",
> > > > > classOf[StringEncoder].getName)
> > > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > > >> >
> > > > > > > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > > > > > > ProducerConfig(props))
> > > > > > > > > >> >
> > > > > > > > > >> >   def send(topic: String, messages: List[String]) = {
> > > > > > > > > >> >     val requests = new
> ArrayBuffer[KeyedMessage[AnyRef,
> > > > > AnyRef]]
> > > > > > > > > >> >     for (message <- messages) {
> > > > > > > > > >> >       requests += new KeyedMessage(topic, null,
> message,
> > > > > > message)
> > > > > > > > > >> >     }
> > > > > > > > > >> >     producer.send(requests)
> > > > > > > > > >> >   }
> > > > > > > > > >> > }
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> --
> > > > > > > > > >> -- Guozhang
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: producer exceptions when broker dies

Posted by Kane Kane <ka...@gmail.com>.
Thanks Guozhang, it makes sense if it's by design. Just wanted to ensure
i'm not doing something wrong.


On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wa...@gmail.com> wrote:

> As we have said, the timeout exception does not actually mean the message
> is not committed to the broker. When message.send.max.retries is 0 Kafka
> does guarantee "at-most-once" which means that you will not have
> duplicates, but not means that all your exceptions can be treated as
> "message not delivered". In your case, 1480 - 1450 = 30 messages are the
> ones that are actually sent, not the ones that are duplicates.
>
> Guozhang
>
>
> On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <ka...@gmail.com> wrote:
>
> > There are a lot of exceptions, I will try to pick an example of each:
> > ERROR async.DefaultEventHandler - Failed to send requests for topics
> > benchmark with correlation ids in [879,881]
> > WARN  async.DefaultEventHandler - Produce request with correlation id 874
> > failed due to [benchmark,43]: kafka.common.RequestTimedOutException
> > WARN  client.ClientUtils$ - Fetching topic metadata with correlation id
> 876
> > for topics [Set(benchmark)] from broker
> [id:2,host:10.80.42.156,port:9092]
> > failed
> > ERROR producer.SyncProducer - Producer connection to
> > 10.80.42.156:9092unsuccessful
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 0
> > tries.
> > WARN  async.DefaultEventHandler - Failed to send producer request with
> > correlation id 270 to broker 0 with data for partitions [benchmark,42]
> >
> > I think these are all types of exceptions i see there.
> > Thanks.
> >
> >
> > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Kane,
> > >
> > > If you set message.send.max.retries to 0 it should be at-most-once,
> and I
> > > saw your props have the right config. What are the exceptions you got
> > from
> > > the send() call?
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <st...@stevemorin.com>
> > > wrote:
> > >
> > > > Kane and Aniket,
> > > >   I am interested in knowing what the pattern/solution that people
> > > usually
> > > > use to implement exactly once as well.
> > > > -Steve
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <ka...@gmail.com>
> > > wrote:
> > > >
> > > > > Guozhang, but i've posted a piece from kafka documentation above:
> > > > > So effectively Kafka guarantees at-least-once delivery by default
> and
> > > > > allows the user to implement at most once delivery by disabling
> > retries
> > > > on
> > > > > the producer.
> > > > >
> > > > > What i want is at-most-once and docs claim it's possible with
> certain
> > > > > settings. Did i miss anything here?
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Aniket is exactly right. In general, Kafka provides "at least
> once"
> > > > > > guarantee instead of "exactly once".
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > aniket.bhatnagar@gmail.com> wrote:
> > > > > >
> > > > > > > As per my understanding, if the broker says the msg is
> committed,
> > > >  its
> > > > > > > guaranteed to have been committed as per ur ack config. If it
> > says
> > > it
> > > > > did
> > > > > > > not get committed, then its very hard to figure out if this was
> > > just
> > > > a
> > > > > > > false error. Since there is concept of unique ids for
> messages, a
> > > > > replay
> > > > > > of
> > > > > > > the same message will result in duplication. I think its a
> > > reasonable
> > > > > > > behaviour considering kafka prefers to append data to
> partitions
> > > fot
> > > > > > > performance reasons.
> > > > > > > The best way to right now deal with duplicate msgs is to build
> > the
> > > > > > > processing engine (layer where your consumer sits) to deal with
> > at
> > > > > least
> > > > > > > once semantics of the broker.
> > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com>
> wrote:
> > > > > > >
> > > > > > > > Or, to rephrase it more generally, is there a way to know
> > exactly
> > > > if
> > > > > > > > message was committed or no?
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > kane.isturm@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello Guozhang,
> > > > > > > > >
> > > > > > > > > My partitions are split almost evenly between broker, so,
> > yes -
> > > > > > broker
> > > > > > > > > that I shutdown is the leader for some of them. Does it
> mean
> > i
> > > > can
> > > > > > get
> > > > > > > an
> > > > > > > > > exception and data is still being written? Is there any
> > setting
> > > > on
> > > > > > the
> > > > > > > > > broker where i can control this? I.e. can i make broker
> > > > replication
> > > > > > > > timeout
> > > > > > > > > shorter than producer timeout, so i can ensure if i get an
> > > > > exception
> > > > > > > data
> > > > > > > > > is not being committed?
> > > > > > > > >
> > > > > > > > > Thanks.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > > wangguoz@gmail.com
> > > > > > > > >wrote:
> > > > > > > > >
> > > > > > > > >> Hello Kane,
> > > > > > > > >>
> > > > > > > > >> As discussed in the other thread, even if a timeout
> response
> > > is
> > > > > sent
> > > > > > > > back
> > > > > > > > >> to the producer, the message may still be committed.
> > > > > > > > >>
> > > > > > > > >> Did you shut down the leader broker of the partition or a
> > > > follower
> > > > > > > > broker?
> > > > > > > > >>
> > > > > > > > >> Guozhang
> > > > > > > > >>
> > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > kane.isturm@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> > I have cluster of 3 kafka brokers. With the following
> > > script I
> > > > > > send
> > > > > > > > some
> > > > > > > > >> > data to kafka and in the middle do the controlled
> shutdown
> > > of
> > > > 1
> > > > > > > > broker.
> > > > > > > > >> All
> > > > > > > > >> > 3 brokers are ISR before I start sending. When i
> shutdown
> > > the
> > > > > > > broker i
> > > > > > > > >> get
> > > > > > > > >> > a couple of exceptions and I expect data shouldn't be
> > > written.
> > > > > > Say,
> > > > > > > I
> > > > > > > > >> send
> > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to consume
> 1450
> > > > > lines,
> > > > > > > but
> > > > > > > > >> > instead i always consume more, i.e. 1480 or 1490. I want
> > to
> > > > > decide
> > > > > > > if
> > > > > > > > I
> > > > > > > > >> > want to retry sending myself, not using
> > > > > message.send.max.retries.
> > > > > > > But
> > > > > > > > >> looks
> > > > > > > > >> > like if I retry sending if there is an exception - I
> will
> > > end
> > > > up
> > > > > > > with
> > > > > > > > >> > duplicates. Is there anything I'm doing wrong or having
> > > wrong
> > > > > > > > >> assumptions
> > > > > > > > >> > about kafka?
> > > > > > > > >> >
> > > > > > > > >> > Thanks.
> > > > > > > > >> >
> > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > > 10.80.42.154:9092,
> > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > >> > var count = 0
> > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > > >> >     try {
> > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > >> >       count += 1
> > > > > > > > >> >       println("sent %s", count)
> > > > > > > > >> >     } catch {
> > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > >> >     }
> > > > > > > > >> > }
> > > > > > > > >> >
> > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > >> >   val sync = true
> > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > >> >
> > > > > > > > >> >   val props = new Properties()
> > > > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > > > >> >   props.put("producer.type", if(sync) "sync" else
> "async")
> > > > > > > > >> >   props.put("request.required.acks",
> requestRequiredAcks)
> > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > classOf[StringEncoder].getName)
> > > > > > > > >> >   props.put("serializer.class",
> > > > classOf[StringEncoder].getName)
> > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > >> >
> > > > > > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > > > > > ProducerConfig(props))
> > > > > > > > >> >
> > > > > > > > >> >   def send(topic: String, messages: List[String]) = {
> > > > > > > > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef,
> > > > AnyRef]]
> > > > > > > > >> >     for (message <- messages) {
> > > > > > > > >> >       requests += new KeyedMessage(topic, null, message,
> > > > > message)
> > > > > > > > >> >     }
> > > > > > > > >> >     producer.send(requests)
> > > > > > > > >> >   }
> > > > > > > > >> > }
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> --
> > > > > > > > >> -- Guozhang
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: producer exceptions when broker dies

Posted by Guozhang Wang <wa...@gmail.com>.
As we have said, the timeout exception does not actually mean the message
is not committed to the broker. When message.send.max.retries is 0 Kafka
does guarantee "at-most-once" which means that you will not have
duplicates, but not means that all your exceptions can be treated as
"message not delivered". In your case, 1480 - 1450 = 30 messages are the
ones that are actually sent, not the ones that are duplicates.

Guozhang


On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <ka...@gmail.com> wrote:

> There are a lot of exceptions, I will try to pick an example of each:
> ERROR async.DefaultEventHandler - Failed to send requests for topics
> benchmark with correlation ids in [879,881]
> WARN  async.DefaultEventHandler - Produce request with correlation id 874
> failed due to [benchmark,43]: kafka.common.RequestTimedOutException
> WARN  client.ClientUtils$ - Fetching topic metadata with correlation id 876
> for topics [Set(benchmark)] from broker [id:2,host:10.80.42.156,port:9092]
> failed
> ERROR producer.SyncProducer - Producer connection to
> 10.80.42.156:9092unsuccessful
> kafka.common.FailedToSendMessageException: Failed to send messages after 0
> tries.
> WARN  async.DefaultEventHandler - Failed to send producer request with
> correlation id 270 to broker 0 with data for partitions [benchmark,42]
>
> I think these are all types of exceptions i see there.
> Thanks.
>
>
> On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Kane,
> >
> > If you set message.send.max.retries to 0 it should be at-most-once, and I
> > saw your props have the right config. What are the exceptions you got
> from
> > the send() call?
> >
> > Guozhang
> >
> >
> > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <st...@stevemorin.com>
> > wrote:
> >
> > > Kane and Aniket,
> > >   I am interested in knowing what the pattern/solution that people
> > usually
> > > use to implement exactly once as well.
> > > -Steve
> > >
> > >
> > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <ka...@gmail.com>
> > wrote:
> > >
> > > > Guozhang, but i've posted a piece from kafka documentation above:
> > > > So effectively Kafka guarantees at-least-once delivery by default and
> > > > allows the user to implement at most once delivery by disabling
> retries
> > > on
> > > > the producer.
> > > >
> > > > What i want is at-most-once and docs claim it's possible with certain
> > > > settings. Did i miss anything here?
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > >
> > > > > Aniket is exactly right. In general, Kafka provides "at least once"
> > > > > guarantee instead of "exactly once".
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > aniket.bhatnagar@gmail.com> wrote:
> > > > >
> > > > > > As per my understanding, if the broker says the msg is committed,
> > >  its
> > > > > > guaranteed to have been committed as per ur ack config. If it
> says
> > it
> > > > did
> > > > > > not get committed, then its very hard to figure out if this was
> > just
> > > a
> > > > > > false error. Since there is concept of unique ids for messages, a
> > > > replay
> > > > > of
> > > > > > the same message will result in duplication. I think its a
> > reasonable
> > > > > > behaviour considering kafka prefers to append data to partitions
> > fot
> > > > > > performance reasons.
> > > > > > The best way to right now deal with duplicate msgs is to build
> the
> > > > > > processing engine (layer where your consumer sits) to deal with
> at
> > > > least
> > > > > > once semantics of the broker.
> > > > > > On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com> wrote:
> > > > > >
> > > > > > > Or, to rephrase it more generally, is there a way to know
> exactly
> > > if
> > > > > > > message was committed or no?
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > kane.isturm@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Guozhang,
> > > > > > > >
> > > > > > > > My partitions are split almost evenly between broker, so,
> yes -
> > > > > broker
> > > > > > > > that I shutdown is the leader for some of them. Does it mean
> i
> > > can
> > > > > get
> > > > > > an
> > > > > > > > exception and data is still being written? Is there any
> setting
> > > on
> > > > > the
> > > > > > > > broker where i can control this? I.e. can i make broker
> > > replication
> > > > > > > timeout
> > > > > > > > shorter than producer timeout, so i can ensure if i get an
> > > > exception
> > > > > > data
> > > > > > > > is not being committed?
> > > > > > > >
> > > > > > > > Thanks.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > wangguoz@gmail.com
> > > > > > > >wrote:
> > > > > > > >
> > > > > > > >> Hello Kane,
> > > > > > > >>
> > > > > > > >> As discussed in the other thread, even if a timeout response
> > is
> > > > sent
> > > > > > > back
> > > > > > > >> to the producer, the message may still be committed.
> > > > > > > >>
> > > > > > > >> Did you shut down the leader broker of the partition or a
> > > follower
> > > > > > > broker?
> > > > > > > >>
> > > > > > > >> Guozhang
> > > > > > > >>
> > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > kane.isturm@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > I have cluster of 3 kafka brokers. With the following
> > script I
> > > > > send
> > > > > > > some
> > > > > > > >> > data to kafka and in the middle do the controlled shutdown
> > of
> > > 1
> > > > > > > broker.
> > > > > > > >> All
> > > > > > > >> > 3 brokers are ISR before I start sending. When i shutdown
> > the
> > > > > > broker i
> > > > > > > >> get
> > > > > > > >> > a couple of exceptions and I expect data shouldn't be
> > written.
> > > > > Say,
> > > > > > I
> > > > > > > >> send
> > > > > > > >> > 1500 lines and get 50 exceptions. I expect to consume 1450
> > > > lines,
> > > > > > but
> > > > > > > >> > instead i always consume more, i.e. 1480 or 1490. I want
> to
> > > > decide
> > > > > > if
> > > > > > > I
> > > > > > > >> > want to retry sending myself, not using
> > > > message.send.max.retries.
> > > > > > But
> > > > > > > >> looks
> > > > > > > >> > like if I retry sending if there is an exception - I will
> > end
> > > up
> > > > > > with
> > > > > > > >> > duplicates. Is there anything I'm doing wrong or having
> > wrong
> > > > > > > >> assumptions
> > > > > > > >> > about kafka?
> > > > > > > >> >
> > > > > > > >> > Thanks.
> > > > > > > >> >
> > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > 10.80.42.154:9092,
> > > > > > > >> > 10.80.42.156:9092")
> > > > > > > >> > var count = 0
> > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > >> >     try {
> > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > >> >       count += 1
> > > > > > > >> >       println("sent %s", count)
> > > > > > > >> >     } catch {
> > > > > > > >> >       case _ => println("Exception!")
> > > > > > > >> >     }
> > > > > > > >> > }
> > > > > > > >> >
> > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > >> >   val sync = true
> > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > >> >
> > > > > > > >> >   val props = new Properties()
> > > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > > > > > > >> >   props.put("request.required.acks", requestRequiredAcks)
> > > > > > > >> >   props.put("key.serializer.class",
> > > > > classOf[StringEncoder].getName)
> > > > > > > >> >   props.put("serializer.class",
> > > classOf[StringEncoder].getName)
> > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > >> >
> > > > > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > > > > ProducerConfig(props))
> > > > > > > >> >
> > > > > > > >> >   def send(topic: String, messages: List[String]) = {
> > > > > > > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef,
> > > AnyRef]]
> > > > > > > >> >     for (message <- messages) {
> > > > > > > >> >       requests += new KeyedMessage(topic, null, message,
> > > > message)
> > > > > > > >> >     }
> > > > > > > >> >     producer.send(requests)
> > > > > > > >> >   }
> > > > > > > >> > }
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> --
> > > > > > > >> -- Guozhang
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: producer exceptions when broker dies

Posted by Kane Kane <ka...@gmail.com>.
There are a lot of exceptions, I will try to pick an example of each:
ERROR async.DefaultEventHandler - Failed to send requests for topics
benchmark with correlation ids in [879,881]
WARN  async.DefaultEventHandler - Produce request with correlation id 874
failed due to [benchmark,43]: kafka.common.RequestTimedOutException
WARN  client.ClientUtils$ - Fetching topic metadata with correlation id 876
for topics [Set(benchmark)] from broker [id:2,host:10.80.42.156,port:9092]
failed
ERROR producer.SyncProducer - Producer connection to
10.80.42.156:9092unsuccessful
kafka.common.FailedToSendMessageException: Failed to send messages after 0
tries.
WARN  async.DefaultEventHandler - Failed to send producer request with
correlation id 270 to broker 0 with data for partitions [benchmark,42]

I think these are all types of exceptions i see there.
Thanks.


On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Kane,
>
> If you set message.send.max.retries to 0 it should be at-most-once, and I
> saw your props have the right config. What are the exceptions you got from
> the send() call?
>
> Guozhang
>
>
> On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <st...@stevemorin.com>
> wrote:
>
> > Kane and Aniket,
> >   I am interested in knowing what the pattern/solution that people
> usually
> > use to implement exactly once as well.
> > -Steve
> >
> >
> > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <ka...@gmail.com>
> wrote:
> >
> > > Guozhang, but i've posted a piece from kafka documentation above:
> > > So effectively Kafka guarantees at-least-once delivery by default and
> > > allows the user to implement at most once delivery by disabling retries
> > on
> > > the producer.
> > >
> > > What i want is at-most-once and docs claim it's possible with certain
> > > settings. Did i miss anything here?
> > >
> > >
> > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > Aniket is exactly right. In general, Kafka provides "at least once"
> > > > guarantee instead of "exactly once".
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > aniket.bhatnagar@gmail.com> wrote:
> > > >
> > > > > As per my understanding, if the broker says the msg is committed,
> >  its
> > > > > guaranteed to have been committed as per ur ack config. If it says
> it
> > > did
> > > > > not get committed, then its very hard to figure out if this was
> just
> > a
> > > > > false error. Since there is concept of unique ids for messages, a
> > > replay
> > > > of
> > > > > the same message will result in duplication. I think its a
> reasonable
> > > > > behaviour considering kafka prefers to append data to partitions
> fot
> > > > > performance reasons.
> > > > > The best way to right now deal with duplicate msgs is to build the
> > > > > processing engine (layer where your consumer sits) to deal with at
> > > least
> > > > > once semantics of the broker.
> > > > > On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com> wrote:
> > > > >
> > > > > > Or, to rephrase it more generally, is there a way to know exactly
> > if
> > > > > > message was committed or no?
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> kane.isturm@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hello Guozhang,
> > > > > > >
> > > > > > > My partitions are split almost evenly between broker, so, yes -
> > > > broker
> > > > > > > that I shutdown is the leader for some of them. Does it mean i
> > can
> > > > get
> > > > > an
> > > > > > > exception and data is still being written? Is there any setting
> > on
> > > > the
> > > > > > > broker where i can control this? I.e. can i make broker
> > replication
> > > > > > timeout
> > > > > > > shorter than producer timeout, so i can ensure if i get an
> > > exception
> > > > > data
> > > > > > > is not being committed?
> > > > > > >
> > > > > > > Thanks.
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > wangguoz@gmail.com
> > > > > > >wrote:
> > > > > > >
> > > > > > >> Hello Kane,
> > > > > > >>
> > > > > > >> As discussed in the other thread, even if a timeout response
> is
> > > sent
> > > > > > back
> > > > > > >> to the producer, the message may still be committed.
> > > > > > >>
> > > > > > >> Did you shut down the leader broker of the partition or a
> > follower
> > > > > > broker?
> > > > > > >>
> > > > > > >> Guozhang
> > > > > > >>
> > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > kane.isturm@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > I have cluster of 3 kafka brokers. With the following
> script I
> > > > send
> > > > > > some
> > > > > > >> > data to kafka and in the middle do the controlled shutdown
> of
> > 1
> > > > > > broker.
> > > > > > >> All
> > > > > > >> > 3 brokers are ISR before I start sending. When i shutdown
> the
> > > > > broker i
> > > > > > >> get
> > > > > > >> > a couple of exceptions and I expect data shouldn't be
> written.
> > > > Say,
> > > > > I
> > > > > > >> send
> > > > > > >> > 1500 lines and get 50 exceptions. I expect to consume 1450
> > > lines,
> > > > > but
> > > > > > >> > instead i always consume more, i.e. 1480 or 1490. I want to
> > > decide
> > > > > if
> > > > > > I
> > > > > > >> > want to retry sending myself, not using
> > > message.send.max.retries.
> > > > > But
> > > > > > >> looks
> > > > > > >> > like if I retry sending if there is an exception - I will
> end
> > up
> > > > > with
> > > > > > >> > duplicates. Is there anything I'm doing wrong or having
> wrong
> > > > > > >> assumptions
> > > > > > >> > about kafka?
> > > > > > >> >
> > > > > > >> > Thanks.
> > > > > > >> >
> > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > 10.80.42.154:9092,
> > > > > > >> > 10.80.42.156:9092")
> > > > > > >> > var count = 0
> > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > >> >     try {
> > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > >> >       count += 1
> > > > > > >> >       println("sent %s", count)
> > > > > > >> >     } catch {
> > > > > > >> >       case _ => println("Exception!")
> > > > > > >> >     }
> > > > > > >> > }
> > > > > > >> >
> > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > >> >   val sync = true
> > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > >> >
> > > > > > >> >   val props = new Properties()
> > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > > > > > >> >   props.put("request.required.acks", requestRequiredAcks)
> > > > > > >> >   props.put("key.serializer.class",
> > > > classOf[StringEncoder].getName)
> > > > > > >> >   props.put("serializer.class",
> > classOf[StringEncoder].getName)
> > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > >> >
> > > > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > > > ProducerConfig(props))
> > > > > > >> >
> > > > > > >> >   def send(topic: String, messages: List[String]) = {
> > > > > > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef,
> > AnyRef]]
> > > > > > >> >     for (message <- messages) {
> > > > > > >> >       requests += new KeyedMessage(topic, null, message,
> > > message)
> > > > > > >> >     }
> > > > > > >> >     producer.send(requests)
> > > > > > >> >   }
> > > > > > >> > }
> > > > > > >> >
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> --
> > > > > > >> -- Guozhang
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: producer exceptions when broker dies

Posted by Guozhang Wang <wa...@gmail.com>.
Kane,

If you set message.send.max.retries to 0 it should be at-most-once, and I
saw your props have the right config. What are the exceptions you got from
the send() call?

Guozhang


On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <st...@stevemorin.com> wrote:

> Kane and Aniket,
>   I am interested in knowing what the pattern/solution that people usually
> use to implement exactly once as well.
> -Steve
>
>
> On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <ka...@gmail.com> wrote:
>
> > Guozhang, but i've posted a piece from kafka documentation above:
> > So effectively Kafka guarantees at-least-once delivery by default and
> > allows the user to implement at most once delivery by disabling retries
> on
> > the producer.
> >
> > What i want is at-most-once and docs claim it's possible with certain
> > settings. Did i miss anything here?
> >
> >
> > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Aniket is exactly right. In general, Kafka provides "at least once"
> > > guarantee instead of "exactly once".
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > aniket.bhatnagar@gmail.com> wrote:
> > >
> > > > As per my understanding, if the broker says the msg is committed,
>  its
> > > > guaranteed to have been committed as per ur ack config. If it says it
> > did
> > > > not get committed, then its very hard to figure out if this was just
> a
> > > > false error. Since there is concept of unique ids for messages, a
> > replay
> > > of
> > > > the same message will result in duplication. I think its a reasonable
> > > > behaviour considering kafka prefers to append data to partitions fot
> > > > performance reasons.
> > > > The best way to right now deal with duplicate msgs is to build the
> > > > processing engine (layer where your consumer sits) to deal with at
> > least
> > > > once semantics of the broker.
> > > > On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com> wrote:
> > > >
> > > > > Or, to rephrase it more generally, is there a way to know exactly
> if
> > > > > message was committed or no?
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <kane.isturm@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hello Guozhang,
> > > > > >
> > > > > > My partitions are split almost evenly between broker, so, yes -
> > > broker
> > > > > > that I shutdown is the leader for some of them. Does it mean i
> can
> > > get
> > > > an
> > > > > > exception and data is still being written? Is there any setting
> on
> > > the
> > > > > > broker where i can control this? I.e. can i make broker
> replication
> > > > > timeout
> > > > > > shorter than producer timeout, so i can ensure if i get an
> > exception
> > > > data
> > > > > > is not being committed?
> > > > > >
> > > > > > Thanks.
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > wangguoz@gmail.com
> > > > > >wrote:
> > > > > >
> > > > > >> Hello Kane,
> > > > > >>
> > > > > >> As discussed in the other thread, even if a timeout response is
> > sent
> > > > > back
> > > > > >> to the producer, the message may still be committed.
> > > > > >>
> > > > > >> Did you shut down the leader broker of the partition or a
> follower
> > > > > broker?
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> kane.isturm@gmail.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >> > I have cluster of 3 kafka brokers. With the following script I
> > > send
> > > > > some
> > > > > >> > data to kafka and in the middle do the controlled shutdown of
> 1
> > > > > broker.
> > > > > >> All
> > > > > >> > 3 brokers are ISR before I start sending. When i shutdown the
> > > > broker i
> > > > > >> get
> > > > > >> > a couple of exceptions and I expect data shouldn't be written.
> > > Say,
> > > > I
> > > > > >> send
> > > > > >> > 1500 lines and get 50 exceptions. I expect to consume 1450
> > lines,
> > > > but
> > > > > >> > instead i always consume more, i.e. 1480 or 1490. I want to
> > decide
> > > > if
> > > > > I
> > > > > >> > want to retry sending myself, not using
> > message.send.max.retries.
> > > > But
> > > > > >> looks
> > > > > >> > like if I retry sending if there is an exception - I will end
> up
> > > > with
> > > > > >> > duplicates. Is there anything I'm doing wrong or having wrong
> > > > > >> assumptions
> > > > > >> > about kafka?
> > > > > >> >
> > > > > >> > Thanks.
> > > > > >> >
> > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> 10.80.42.154:9092,
> > > > > >> > 10.80.42.156:9092")
> > > > > >> > var count = 0
> > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > >> >     try {
> > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > >> >       count += 1
> > > > > >> >       println("sent %s", count)
> > > > > >> >     } catch {
> > > > > >> >       case _ => println("Exception!")
> > > > > >> >     }
> > > > > >> > }
> > > > > >> >
> > > > > >> > class MyProducer(brokerList: String) {
> > > > > >> >   val sync = true
> > > > > >> >   val requestRequiredAcks = "-1"
> > > > > >> >
> > > > > >> >   val props = new Properties()
> > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > > > > >> >   props.put("request.required.acks", requestRequiredAcks)
> > > > > >> >   props.put("key.serializer.class",
> > > classOf[StringEncoder].getName)
> > > > > >> >   props.put("serializer.class",
> classOf[StringEncoder].getName)
> > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > >> >
> > > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > > ProducerConfig(props))
> > > > > >> >
> > > > > >> >   def send(topic: String, messages: List[String]) = {
> > > > > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef,
> AnyRef]]
> > > > > >> >     for (message <- messages) {
> > > > > >> >       requests += new KeyedMessage(topic, null, message,
> > message)
> > > > > >> >     }
> > > > > >> >     producer.send(requests)
> > > > > >> >   }
> > > > > >> > }
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> -- Guozhang
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang

Re: producer exceptions when broker dies

Posted by Steve Morin <st...@stevemorin.com>.
Kane and Aniket,
  I am interested in knowing what the pattern/solution that people usually
use to implement exactly once as well.
-Steve


On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <ka...@gmail.com> wrote:

> Guozhang, but i've posted a piece from kafka documentation above:
> So effectively Kafka guarantees at-least-once delivery by default and
> allows the user to implement at most once delivery by disabling retries on
> the producer.
>
> What i want is at-most-once and docs claim it's possible with certain
> settings. Did i miss anything here?
>
>
> On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Aniket is exactly right. In general, Kafka provides "at least once"
> > guarantee instead of "exactly once".
> >
> > Guozhang
> >
> >
> > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > aniket.bhatnagar@gmail.com> wrote:
> >
> > > As per my understanding, if the broker says the msg is committed,  its
> > > guaranteed to have been committed as per ur ack config. If it says it
> did
> > > not get committed, then its very hard to figure out if this was just a
> > > false error. Since there is concept of unique ids for messages, a
> replay
> > of
> > > the same message will result in duplication. I think its a reasonable
> > > behaviour considering kafka prefers to append data to partitions fot
> > > performance reasons.
> > > The best way to right now deal with duplicate msgs is to build the
> > > processing engine (layer where your consumer sits) to deal with at
> least
> > > once semantics of the broker.
> > > On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com> wrote:
> > >
> > > > Or, to rephrase it more generally, is there a way to know exactly if
> > > > message was committed or no?
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <ka...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Guozhang,
> > > > >
> > > > > My partitions are split almost evenly between broker, so, yes -
> > broker
> > > > > that I shutdown is the leader for some of them. Does it mean i can
> > get
> > > an
> > > > > exception and data is still being written? Is there any setting on
> > the
> > > > > broker where i can control this? I.e. can i make broker replication
> > > > timeout
> > > > > shorter than producer timeout, so i can ensure if i get an
> exception
> > > data
> > > > > is not being committed?
> > > > >
> > > > > Thanks.
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> wangguoz@gmail.com
> > > > >wrote:
> > > > >
> > > > >> Hello Kane,
> > > > >>
> > > > >> As discussed in the other thread, even if a timeout response is
> sent
> > > > back
> > > > >> to the producer, the message may still be committed.
> > > > >>
> > > > >> Did you shut down the leader broker of the partition or a follower
> > > > broker?
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <kane.isturm@gmail.com
> >
> > > > wrote:
> > > > >>
> > > > >> > I have cluster of 3 kafka brokers. With the following script I
> > send
> > > > some
> > > > >> > data to kafka and in the middle do the controlled shutdown of 1
> > > > broker.
> > > > >> All
> > > > >> > 3 brokers are ISR before I start sending. When i shutdown the
> > > broker i
> > > > >> get
> > > > >> > a couple of exceptions and I expect data shouldn't be written.
> > Say,
> > > I
> > > > >> send
> > > > >> > 1500 lines and get 50 exceptions. I expect to consume 1450
> lines,
> > > but
> > > > >> > instead i always consume more, i.e. 1480 or 1490. I want to
> decide
> > > if
> > > > I
> > > > >> > want to retry sending myself, not using
> message.send.max.retries.
> > > But
> > > > >> looks
> > > > >> > like if I retry sending if there is an exception - I will end up
> > > with
> > > > >> > duplicates. Is there anything I'm doing wrong or having wrong
> > > > >> assumptions
> > > > >> > about kafka?
> > > > >> >
> > > > >> > Thanks.
> > > > >> >
> > > > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> > > > >> > 10.80.42.156:9092")
> > > > >> > var count = 0
> > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > >> >     try {
> > > > >> >       prod.send("benchmark", buffer.toList)
> > > > >> >       count += 1
> > > > >> >       println("sent %s", count)
> > > > >> >     } catch {
> > > > >> >       case _ => println("Exception!")
> > > > >> >     }
> > > > >> > }
> > > > >> >
> > > > >> > class MyProducer(brokerList: String) {
> > > > >> >   val sync = true
> > > > >> >   val requestRequiredAcks = "-1"
> > > > >> >
> > > > >> >   val props = new Properties()
> > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > > > >> >   props.put("request.required.acks", requestRequiredAcks)
> > > > >> >   props.put("key.serializer.class",
> > classOf[StringEncoder].getName)
> > > > >> >   props.put("serializer.class", classOf[StringEncoder].getName)
> > > > >> >   props.put("message.send.max.retries", "0")
> > > > >> >   props.put("request.timeout.ms", "2000")
> > > > >> >
> > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > ProducerConfig(props))
> > > > >> >
> > > > >> >   def send(topic: String, messages: List[String]) = {
> > > > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> > > > >> >     for (message <- messages) {
> > > > >> >       requests += new KeyedMessage(topic, null, message,
> message)
> > > > >> >     }
> > > > >> >     producer.send(requests)
> > > > >> >   }
> > > > >> > }
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: producer exceptions when broker dies

Posted by Kane Kane <ka...@gmail.com>.
Guozhang, but i've posted a piece from kafka documentation above:
So effectively Kafka guarantees at-least-once delivery by default and
allows the user to implement at most once delivery by disabling retries on
the producer.

What i want is at-most-once and docs claim it's possible with certain
settings. Did i miss anything here?


On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Aniket is exactly right. In general, Kafka provides "at least once"
> guarantee instead of "exactly once".
>
> Guozhang
>
>
> On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> aniket.bhatnagar@gmail.com> wrote:
>
> > As per my understanding, if the broker says the msg is committed,  its
> > guaranteed to have been committed as per ur ack config. If it says it did
> > not get committed, then its very hard to figure out if this was just a
> > false error. Since there is concept of unique ids for messages, a replay
> of
> > the same message will result in duplication. I think its a reasonable
> > behaviour considering kafka prefers to append data to partitions fot
> > performance reasons.
> > The best way to right now deal with duplicate msgs is to build the
> > processing engine (layer where your consumer sits) to deal with at least
> > once semantics of the broker.
> > On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com> wrote:
> >
> > > Or, to rephrase it more generally, is there a way to know exactly if
> > > message was committed or no?
> > >
> > >
> > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <ka...@gmail.com>
> > wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > My partitions are split almost evenly between broker, so, yes -
> broker
> > > > that I shutdown is the leader for some of them. Does it mean i can
> get
> > an
> > > > exception and data is still being written? Is there any setting on
> the
> > > > broker where i can control this? I.e. can i make broker replication
> > > timeout
> > > > shorter than producer timeout, so i can ensure if i get an exception
> > data
> > > > is not being committed?
> > > >
> > > > Thanks.
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangguoz@gmail.com
> > > >wrote:
> > > >
> > > >> Hello Kane,
> > > >>
> > > >> As discussed in the other thread, even if a timeout response is sent
> > > back
> > > >> to the producer, the message may still be committed.
> > > >>
> > > >> Did you shut down the leader broker of the partition or a follower
> > > broker?
> > > >>
> > > >> Guozhang
> > > >>
> > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <ka...@gmail.com>
> > > wrote:
> > > >>
> > > >> > I have cluster of 3 kafka brokers. With the following script I
> send
> > > some
> > > >> > data to kafka and in the middle do the controlled shutdown of 1
> > > broker.
> > > >> All
> > > >> > 3 brokers are ISR before I start sending. When i shutdown the
> > broker i
> > > >> get
> > > >> > a couple of exceptions and I expect data shouldn't be written.
> Say,
> > I
> > > >> send
> > > >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines,
> > but
> > > >> > instead i always consume more, i.e. 1480 or 1490. I want to decide
> > if
> > > I
> > > >> > want to retry sending myself, not using message.send.max.retries.
> > But
> > > >> looks
> > > >> > like if I retry sending if there is an exception - I will end up
> > with
> > > >> > duplicates. Is there anything I'm doing wrong or having wrong
> > > >> assumptions
> > > >> > about kafka?
> > > >> >
> > > >> > Thanks.
> > > >> >
> > > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> > > >> > 10.80.42.156:9092")
> > > >> > var count = 0
> > > >> > for(line <- Source.fromFile(file).getLines()){
> > > >> >     try {
> > > >> >       prod.send("benchmark", buffer.toList)
> > > >> >       count += 1
> > > >> >       println("sent %s", count)
> > > >> >     } catch {
> > > >> >       case _ => println("Exception!")
> > > >> >     }
> > > >> > }
> > > >> >
> > > >> > class MyProducer(brokerList: String) {
> > > >> >   val sync = true
> > > >> >   val requestRequiredAcks = "-1"
> > > >> >
> > > >> >   val props = new Properties()
> > > >> >   props.put("metadata.broker.list", brokerList)
> > > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > > >> >   props.put("request.required.acks", requestRequiredAcks)
> > > >> >   props.put("key.serializer.class",
> classOf[StringEncoder].getName)
> > > >> >   props.put("serializer.class", classOf[StringEncoder].getName)
> > > >> >   props.put("message.send.max.retries", "0")
> > > >> >   props.put("request.timeout.ms", "2000")
> > > >> >
> > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > ProducerConfig(props))
> > > >> >
> > > >> >   def send(topic: String, messages: List[String]) = {
> > > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> > > >> >     for (message <- messages) {
> > > >> >       requests += new KeyedMessage(topic, null, message, message)
> > > >> >     }
> > > >> >     producer.send(requests)
> > > >> >   }
> > > >> > }
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: producer exceptions when broker dies

Posted by Guozhang Wang <wa...@gmail.com>.
Aniket is exactly right. In general, Kafka provides "at least once"
guarantee instead of "exactly once".

Guozhang


On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
aniket.bhatnagar@gmail.com> wrote:

> As per my understanding, if the broker says the msg is committed,  its
> guaranteed to have been committed as per ur ack config. If it says it did
> not get committed, then its very hard to figure out if this was just a
> false error. Since there is concept of unique ids for messages, a replay of
> the same message will result in duplication. I think its a reasonable
> behaviour considering kafka prefers to append data to partitions fot
> performance reasons.
> The best way to right now deal with duplicate msgs is to build the
> processing engine (layer where your consumer sits) to deal with at least
> once semantics of the broker.
> On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com> wrote:
>
> > Or, to rephrase it more generally, is there a way to know exactly if
> > message was committed or no?
> >
> >
> > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <ka...@gmail.com>
> wrote:
> >
> > > Hello Guozhang,
> > >
> > > My partitions are split almost evenly between broker, so, yes - broker
> > > that I shutdown is the leader for some of them. Does it mean i can get
> an
> > > exception and data is still being written? Is there any setting on the
> > > broker where i can control this? I.e. can i make broker replication
> > timeout
> > > shorter than producer timeout, so i can ensure if i get an exception
> data
> > > is not being committed?
> > >
> > > Thanks.
> > >
> > >
> > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangguoz@gmail.com
> > >wrote:
> > >
> > >> Hello Kane,
> > >>
> > >> As discussed in the other thread, even if a timeout response is sent
> > back
> > >> to the producer, the message may still be committed.
> > >>
> > >> Did you shut down the leader broker of the partition or a follower
> > broker?
> > >>
> > >> Guozhang
> > >>
> > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <ka...@gmail.com>
> > wrote:
> > >>
> > >> > I have cluster of 3 kafka brokers. With the following script I send
> > some
> > >> > data to kafka and in the middle do the controlled shutdown of 1
> > broker.
> > >> All
> > >> > 3 brokers are ISR before I start sending. When i shutdown the
> broker i
> > >> get
> > >> > a couple of exceptions and I expect data shouldn't be written. Say,
> I
> > >> send
> > >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines,
> but
> > >> > instead i always consume more, i.e. 1480 or 1490. I want to decide
> if
> > I
> > >> > want to retry sending myself, not using message.send.max.retries.
> But
> > >> looks
> > >> > like if I retry sending if there is an exception - I will end up
> with
> > >> > duplicates. Is there anything I'm doing wrong or having wrong
> > >> assumptions
> > >> > about kafka?
> > >> >
> > >> > Thanks.
> > >> >
> > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> > >> > 10.80.42.156:9092")
> > >> > var count = 0
> > >> > for(line <- Source.fromFile(file).getLines()){
> > >> >     try {
> > >> >       prod.send("benchmark", buffer.toList)
> > >> >       count += 1
> > >> >       println("sent %s", count)
> > >> >     } catch {
> > >> >       case _ => println("Exception!")
> > >> >     }
> > >> > }
> > >> >
> > >> > class MyProducer(brokerList: String) {
> > >> >   val sync = true
> > >> >   val requestRequiredAcks = "-1"
> > >> >
> > >> >   val props = new Properties()
> > >> >   props.put("metadata.broker.list", brokerList)
> > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > >> >   props.put("request.required.acks", requestRequiredAcks)
> > >> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
> > >> >   props.put("serializer.class", classOf[StringEncoder].getName)
> > >> >   props.put("message.send.max.retries", "0")
> > >> >   props.put("request.timeout.ms", "2000")
> > >> >
> > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > ProducerConfig(props))
> > >> >
> > >> >   def send(topic: String, messages: List[String]) = {
> > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> > >> >     for (message <- messages) {
> > >> >       requests += new KeyedMessage(topic, null, message, message)
> > >> >     }
> > >> >     producer.send(requests)
> > >> >   }
> > >> > }
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>



-- 
-- Guozhang

Re: producer exceptions when broker dies

Posted by Aniket Bhatnagar <an...@gmail.com>.
As per my understanding, if the broker says the msg is committed,  its
guaranteed to have been committed as per ur ack config. If it says it did
not get committed, then its very hard to figure out if this was just a
false error. Since there is concept of unique ids for messages, a replay of
the same message will result in duplication. I think its a reasonable
behaviour considering kafka prefers to append data to partitions fot
performance reasons.
The best way to right now deal with duplicate msgs is to build the
processing engine (layer where your consumer sits) to deal with at least
once semantics of the broker.
On 25 Oct 2013 23:23, "Kane Kane" <ka...@gmail.com> wrote:

> Or, to rephrase it more generally, is there a way to know exactly if
> message was committed or no?
>
>
> On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <ka...@gmail.com> wrote:
>
> > Hello Guozhang,
> >
> > My partitions are split almost evenly between broker, so, yes - broker
> > that I shutdown is the leader for some of them. Does it mean i can get an
> > exception and data is still being written? Is there any setting on the
> > broker where i can control this? I.e. can i make broker replication
> timeout
> > shorter than producer timeout, so i can ensure if i get an exception data
> > is not being committed?
> >
> > Thanks.
> >
> >
> > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangguoz@gmail.com
> >wrote:
> >
> >> Hello Kane,
> >>
> >> As discussed in the other thread, even if a timeout response is sent
> back
> >> to the producer, the message may still be committed.
> >>
> >> Did you shut down the leader broker of the partition or a follower
> broker?
> >>
> >> Guozhang
> >>
> >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <ka...@gmail.com>
> wrote:
> >>
> >> > I have cluster of 3 kafka brokers. With the following script I send
> some
> >> > data to kafka and in the middle do the controlled shutdown of 1
> broker.
> >> All
> >> > 3 brokers are ISR before I start sending. When i shutdown the broker i
> >> get
> >> > a couple of exceptions and I expect data shouldn't be written. Say, I
> >> send
> >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
> >> > instead i always consume more, i.e. 1480 or 1490. I want to decide if
> I
> >> > want to retry sending myself, not using message.send.max.retries. But
> >> looks
> >> > like if I retry sending if there is an exception - I will end up with
> >> > duplicates. Is there anything I'm doing wrong or having wrong
> >> assumptions
> >> > about kafka?
> >> >
> >> > Thanks.
> >> >
> >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> >> > 10.80.42.156:9092")
> >> > var count = 0
> >> > for(line <- Source.fromFile(file).getLines()){
> >> >     try {
> >> >       prod.send("benchmark", buffer.toList)
> >> >       count += 1
> >> >       println("sent %s", count)
> >> >     } catch {
> >> >       case _ => println("Exception!")
> >> >     }
> >> > }
> >> >
> >> > class MyProducer(brokerList: String) {
> >> >   val sync = true
> >> >   val requestRequiredAcks = "-1"
> >> >
> >> >   val props = new Properties()
> >> >   props.put("metadata.broker.list", brokerList)
> >> >   props.put("producer.type", if(sync) "sync" else "async")
> >> >   props.put("request.required.acks", requestRequiredAcks)
> >> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
> >> >   props.put("serializer.class", classOf[StringEncoder].getName)
> >> >   props.put("message.send.max.retries", "0")
> >> >   props.put("request.timeout.ms", "2000")
> >> >
> >> >   val producer = new Producer[AnyRef, AnyRef](new
> ProducerConfig(props))
> >> >
> >> >   def send(topic: String, messages: List[String]) = {
> >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> >> >     for (message <- messages) {
> >> >       requests += new KeyedMessage(topic, null, message, message)
> >> >     }
> >> >     producer.send(requests)
> >> >   }
> >> > }
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>

Re: producer exceptions when broker dies

Posted by Kane Kane <ka...@gmail.com>.
Or, to rephrase it more generally, is there a way to know exactly if
message was committed or no?


On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <ka...@gmail.com> wrote:

> Hello Guozhang,
>
> My partitions are split almost evenly between broker, so, yes - broker
> that I shutdown is the leader for some of them. Does it mean i can get an
> exception and data is still being written? Is there any setting on the
> broker where i can control this? I.e. can i make broker replication timeout
> shorter than producer timeout, so i can ensure if i get an exception data
> is not being committed?
>
> Thanks.
>
>
> On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wa...@gmail.com>wrote:
>
>> Hello Kane,
>>
>> As discussed in the other thread, even if a timeout response is sent back
>> to the producer, the message may still be committed.
>>
>> Did you shut down the leader broker of the partition or a follower broker?
>>
>> Guozhang
>>
>> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <ka...@gmail.com> wrote:
>>
>> > I have cluster of 3 kafka brokers. With the following script I send some
>> > data to kafka and in the middle do the controlled shutdown of 1 broker.
>> All
>> > 3 brokers are ISR before I start sending. When i shutdown the broker i
>> get
>> > a couple of exceptions and I expect data shouldn't be written. Say, I
>> send
>> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
>> > instead i always consume more, i.e. 1480 or 1490. I want to decide if I
>> > want to retry sending myself, not using message.send.max.retries. But
>> looks
>> > like if I retry sending if there is an exception - I will end up with
>> > duplicates. Is there anything I'm doing wrong or having wrong
>> assumptions
>> > about kafka?
>> >
>> > Thanks.
>> >
>> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
>> > 10.80.42.156:9092")
>> > var count = 0
>> > for(line <- Source.fromFile(file).getLines()){
>> >     try {
>> >       prod.send("benchmark", buffer.toList)
>> >       count += 1
>> >       println("sent %s", count)
>> >     } catch {
>> >       case _ => println("Exception!")
>> >     }
>> > }
>> >
>> > class MyProducer(brokerList: String) {
>> >   val sync = true
>> >   val requestRequiredAcks = "-1"
>> >
>> >   val props = new Properties()
>> >   props.put("metadata.broker.list", brokerList)
>> >   props.put("producer.type", if(sync) "sync" else "async")
>> >   props.put("request.required.acks", requestRequiredAcks)
>> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
>> >   props.put("serializer.class", classOf[StringEncoder].getName)
>> >   props.put("message.send.max.retries", "0")
>> >   props.put("request.timeout.ms", "2000")
>> >
>> >   val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
>> >
>> >   def send(topic: String, messages: List[String]) = {
>> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
>> >     for (message <- messages) {
>> >       requests += new KeyedMessage(topic, null, message, message)
>> >     }
>> >     producer.send(requests)
>> >   }
>> > }
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: producer exceptions when broker dies

Posted by Kane Kane <ka...@gmail.com>.
Hello Guozhang,

My partitions are split almost evenly between broker, so, yes - broker that
I shutdown is the leader for some of them. Does it mean i can get an
exception and data is still being written? Is there any setting on the
broker where i can control this? I.e. can i make broker replication timeout
shorter than producer timeout, so i can ensure if i get an exception data
is not being committed?

Thanks.


On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Kane,
>
> As discussed in the other thread, even if a timeout response is sent back
> to the producer, the message may still be committed.
>
> Did you shut down the leader broker of the partition or a follower broker?
>
> Guozhang
>
> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <ka...@gmail.com> wrote:
>
> > I have cluster of 3 kafka brokers. With the following script I send some
> > data to kafka and in the middle do the controlled shutdown of 1 broker.
> All
> > 3 brokers are ISR before I start sending. When i shutdown the broker i
> get
> > a couple of exceptions and I expect data shouldn't be written. Say, I
> send
> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
> > instead i always consume more, i.e. 1480 or 1490. I want to decide if I
> > want to retry sending myself, not using message.send.max.retries. But
> looks
> > like if I retry sending if there is an exception - I will end up with
> > duplicates. Is there anything I'm doing wrong or having wrong assumptions
> > about kafka?
> >
> > Thanks.
> >
> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> > 10.80.42.156:9092")
> > var count = 0
> > for(line <- Source.fromFile(file).getLines()){
> >     try {
> >       prod.send("benchmark", buffer.toList)
> >       count += 1
> >       println("sent %s", count)
> >     } catch {
> >       case _ => println("Exception!")
> >     }
> > }
> >
> > class MyProducer(brokerList: String) {
> >   val sync = true
> >   val requestRequiredAcks = "-1"
> >
> >   val props = new Properties()
> >   props.put("metadata.broker.list", brokerList)
> >   props.put("producer.type", if(sync) "sync" else "async")
> >   props.put("request.required.acks", requestRequiredAcks)
> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
> >   props.put("serializer.class", classOf[StringEncoder].getName)
> >   props.put("message.send.max.retries", "0")
> >   props.put("request.timeout.ms", "2000")
> >
> >   val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
> >
> >   def send(topic: String, messages: List[String]) = {
> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> >     for (message <- messages) {
> >       requests += new KeyedMessage(topic, null, message, message)
> >     }
> >     producer.send(requests)
> >   }
> > }
> >
>
>
>
> --
> -- Guozhang
>

Re: producer exceptions when broker dies

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Kane,

As discussed in the other thread, even if a timeout response is sent back
to the producer, the message may still be committed.

Did you shut down the leader broker of the partition or a follower broker?

Guozhang

On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <ka...@gmail.com> wrote:

> I have cluster of 3 kafka brokers. With the following script I send some
> data to kafka and in the middle do the controlled shutdown of 1 broker. All
> 3 brokers are ISR before I start sending. When i shutdown the broker i get
> a couple of exceptions and I expect data shouldn't be written. Say, I send
> 1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
> instead i always consume more, i.e. 1480 or 1490. I want to decide if I
> want to retry sending myself, not using message.send.max.retries. But looks
> like if I retry sending if there is an exception - I will end up with
> duplicates. Is there anything I'm doing wrong or having wrong assumptions
> about kafka?
>
> Thanks.
>
> val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> 10.80.42.156:9092")
> var count = 0
> for(line <- Source.fromFile(file).getLines()){
>     try {
>       prod.send("benchmark", buffer.toList)
>       count += 1
>       println("sent %s", count)
>     } catch {
>       case _ => println("Exception!")
>     }
> }
>
> class MyProducer(brokerList: String) {
>   val sync = true
>   val requestRequiredAcks = "-1"
>
>   val props = new Properties()
>   props.put("metadata.broker.list", brokerList)
>   props.put("producer.type", if(sync) "sync" else "async")
>   props.put("request.required.acks", requestRequiredAcks)
>   props.put("key.serializer.class", classOf[StringEncoder].getName)
>   props.put("serializer.class", classOf[StringEncoder].getName)
>   props.put("message.send.max.retries", "0")
>   props.put("request.timeout.ms", "2000")
>
>   val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
>
>   def send(topic: String, messages: List[String]) = {
>     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
>     for (message <- messages) {
>       requests += new KeyedMessage(topic, null, message, message)
>     }
>     producer.send(requests)
>   }
> }
>



-- 
-- Guozhang