You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Li Tao <ah...@gmail.com> on 2015/11/08 09:16:10 UTC

Re: question about async publisher blocking when broker is down.

Hi, according to my undersanding, your scenario does not apply here. Async
does not mean it buffers message when connection is lost(you killed the
broker). If the connection is down, the producer should detect it as a
exceptional condition, and throw this exception to application level to
handle it.

Correct me if I am wrong.

On Sat, Oct 31, 2015 at 4:25 AM, Ilya Goberman <ig...@kcg.com> wrote:

> I am new to kafka and apologize if this is already answered. I am testing
> a simple async publisher behavior when broker is down. I use kafka version
> 8.2.2.
>
>
> I have set up "queue.buffering.max.messages" to 200 and "
> queue.enqueue.timeout.ms" set to -1. My understanding is that if "
> queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send'
> should block when queue of 200 is reached. But this is not what I am seeing.
>
>
> My publisher has these properties.
>         Properties props = new Properties();
>         props.put("metadata.broker.list", "cno-d-igoberman2:9092");
>         props.put("serializer.class", "kafka.serializer.StringEncoder");
>         props.put("producer.type", "async");
>         props.put("partitioner.class",
> "com.kcg.kafka.test.SimplePartitioner");
>         props.put("request.required.acks", "1");
>         props.put("queue.buffering.max.messages", "200");
>         props.put("queue.enqueue.timeout.ms", "-1");
>
>
> This is scenario I am testing:
>
> 1) start broker.
>
> 2) start publishing in a loop.
>
> 3) kill broker.
>
>
> At this point my producer keeps calling 'producer.send' without blocking
> (but slows down considerably). I suspect that messages are lost - but this
> is not what I want. Is this a known limitation of producers in kafka?
>
> Any help in clarifying it will be appreciated. Also, I understand that
> producers are in the process of being redesigned in the next release. When
> will it be available? Should I even bother with the current version?
>
> Thanks
>
>
> This is what I am seeing in the log:
>
>
> 2015-10-30 14:50:29 INFO  SyncProducer:68 - Disconnecting from
> cno-d-igoberman2:9092
> 2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics
> [Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> failed
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> failed
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>     at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>     at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
>     at kafka.utils.Utils$.swallow(Utils.scala:172)
>     at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>     at kafka.utils.Utils$.swallowError(Utils.scala:45)
>     at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
>     at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>     at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>     at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>     at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>     at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> Caused by: java.nio.channels.ClosedChannelException
>     at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>     at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>     at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>     ... 12 more
> 2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send requests
> for topics test with correlation ids in [0,8]
> 2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling batch
> of 5 events
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>     at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>     at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>     at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>     at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>     at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>     at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> 2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic
> test, partition key: 5, data: 1446234628741: 5
> 2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event:
> KeyedMessage(test,6,6,1446234629741: 6)
> 2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200
>
>
>
> This e-mail and its attachments are intended only for the individual or
> entity to whom it is addressed and may contain information that is
> confidential, privileged, inside information, or subject to other
> restrictions on use or disclosure. Any unauthorized use, dissemination or
> copying of this transmission or the information in it is prohibited and may
> be unlawful. If you have received this transmission in error, please notify
> the sender immediately by return e-mail, and permanently delete or destroy
> this e-mail, any attachments, and all copies (digital or paper). Unless
> expressly stated in this e-mail, nothing in this message should be
> construed as a digital or electronic signature. For additional important
> disclaimers and disclosures regarding KCG's products and services, please
> click on the following link:
>
> http://www.kcg.com/legal/global-disclosures
>

Re: question about async publisher blocking when broker is down.

Posted by Damian Guy <da...@gmail.com>.
Hi,
If you are using the Scala Producer then yes it will drop messages. It will
try up to num retries times and then throw a FailedToSendMessageException.
This is caught in the ProducerSendThread and logged, you'd see something
like:
"Error in handling batch of 10 events ..."

If you don't want to drop messages (who does?) then i suggest using the
sync producer and doing your own batching.

Cheers,
Damian

On 8 November 2015 at 08:16, Li Tao <ah...@gmail.com> wrote:

> Hi, according to my undersanding, your scenario does not apply here. Async
> does not mean it buffers message when connection is lost(you killed the
> broker). If the connection is down, the producer should detect it as a
> exceptional condition, and throw this exception to application level to
> handle it.
>
> Correct me if I am wrong.
>
> On Sat, Oct 31, 2015 at 4:25 AM, Ilya Goberman <ig...@kcg.com> wrote:
>
> > I am new to kafka and apologize if this is already answered. I am testing
> > a simple async publisher behavior when broker is down. I use kafka
> version
> > 8.2.2.
> >
> >
> > I have set up "queue.buffering.max.messages" to 200 and "
> > queue.enqueue.timeout.ms" set to -1. My understanding is that if "
> > queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send'
> > should block when queue of 200 is reached. But this is not what I am
> seeing.
> >
> >
> > My publisher has these properties.
> >         Properties props = new Properties();
> >         props.put("metadata.broker.list", "cno-d-igoberman2:9092");
> >         props.put("serializer.class", "kafka.serializer.StringEncoder");
> >         props.put("producer.type", "async");
> >         props.put("partitioner.class",
> > "com.kcg.kafka.test.SimplePartitioner");
> >         props.put("request.required.acks", "1");
> >         props.put("queue.buffering.max.messages", "200");
> >         props.put("queue.enqueue.timeout.ms", "-1");
> >
> >
> > This is scenario I am testing:
> >
> > 1) start broker.
> >
> > 2) start publishing in a loop.
> >
> > 3) kill broker.
> >
> >
> > At this point my producer keeps calling 'producer.send' without blocking
> > (but slows down considerably). I suspect that messages are lost - but
> this
> > is not what I want. Is this a known limitation of producers in kafka?
> >
> > Any help in clarifying it will be appreciated. Also, I understand that
> > producers are in the process of being redesigned in the next release.
> When
> > will it be available? Should I even bother with the current version?
> >
> > Thanks
> >
> >
> > This is what I am seeing in the log:
> >
> >
> > 2015-10-30 14:50:29 INFO  SyncProducer:68 - Disconnecting from
> > cno-d-igoberman2:9092
> > 2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics
> > [Set(test)] from broker
> [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> > failed
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(test)] from broker
> [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> > failed
> >     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> >     at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >     at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> >     at kafka.utils.Utils$.swallow(Utils.scala:172)
> >     at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> >     at kafka.utils.Utils$.swallowError(Utils.scala:45)
> >     at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> >     at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> >     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >     at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> >     at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > Caused by: java.nio.channels.ClosedChannelException
> >     at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> >     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> >     at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> >     at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> >     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> >     ... 12 more
> > 2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send
> requests
> > for topics test with correlation ids in [0,8]
> > 2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling
> batch
> > of 5 events
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> >     at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> >     at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> >     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >     at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> >     at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > 2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic
> > test, partition key: 5, data: 1446234628741: 5
> > 2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event:
> > KeyedMessage(test,6,6,1446234629741: 6)
> > 2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200
> >
> >
> >
> > This e-mail and its attachments are intended only for the individual or
> > entity to whom it is addressed and may contain information that is
> > confidential, privileged, inside information, or subject to other
> > restrictions on use or disclosure. Any unauthorized use, dissemination or
> > copying of this transmission or the information in it is prohibited and
> may
> > be unlawful. If you have received this transmission in error, please
> notify
> > the sender immediately by return e-mail, and permanently delete or
> destroy
> > this e-mail, any attachments, and all copies (digital or paper). Unless
> > expressly stated in this e-mail, nothing in this message should be
> > construed as a digital or electronic signature. For additional important
> > disclaimers and disclosures regarding KCG's products and services, please
> > click on the following link:
> >
> > http://www.kcg.com/legal/global-disclosures
> >
>