You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Petr Novak <os...@gmail.com> on 2017/05/02 13:38:33 UTC

Does queue.time still apply for the new Producer?

The documentation reads as:

"As events enter a queue, they are buffered in a queue, until either
queue.time or batch.size is reached. A background thread
(kafka.producer.async.ProducerSendThread) dequeues the batch of data and
lets the kafka.producer.EventHandler serialize and send the data to the
appropriate kafka broker partition."

 

It clearly seems yes. But isn't it only for the old producer? Because
queue.time is not part of Producer configuration options as defined in the
same documentation. I can't find it in ProducerConfig source
<https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache
/kafka/clients/producer/ProducerConfig.java> . Looking into source code
there is deprecated ProducerSendThread
<https://github.com/apache/kafka/blob/128d0ff91d84a3a1f5a5237133f9ec01caf18d
66/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala>  which
uses queueTime. But at first glance I can't see where it is used in the new
DefaultEventHandler
<https://github.com/apache/kafka/blob/128d0ff91d84a3a1f5a5237133f9ec01caf18d
66/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala>  or
am I looking at the wrong place? The Producer
<https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/produc
er/Producer.scala>  asyncSend seems to use only queueEnqueueTimeoutMs.


I'm trying to figure out if sending can be blocked by queue.time or
batch.size in the new producer. I thought that it is not and that sending
loop dequeues as fast as possible, creates batches of max batch.size from
what is currently there. And how fast it is dequed is limited only by linger
and max.in.flight.requests.per.connection. Or it still waits for either
queue.time to batch.size to fill? I got confused by the documentation which
suggests otherwise.



I can't wrap my head around how linger would help if Producer waits for
queue.time or batch.size. What can I control with linger I can't control
with the combination of other two in this case. And how they all play
together? (If all still applies)

Many thanks for any clarification,
Petr


RE: Does queue.time still apply for the new Producer?

Posted by Petr Novak <os...@gmail.com>.
I have tried it with new and old producer. I sent 2 messages to Kafka topic
in sequence on program start.



queue.time  seems to have no effect in both.



The new producer sends them immediately even with queue.time and batch.size
set very high. It blocks on linger.ms as expected.


Interesting is that the old producer doesn't block on queue.time either.
This setting is not described even in Kafka configs for old producer. But it
blocks on queue.buffering.max.ms, queue.buffering.max.messages and
batch.num.messages.

 

To me it looks like queue.time comes from nowhere in Kafka documentation. It
is mentioned only on one place.

Regards,
Petr



From: Petr Novak [mailto:oss.mlists@gmail.com] 
Sent: 2. května 2017 15:39
To: users@kafka.apache.org
Subject: Does queue.time still apply for the new Producer?

 

The documentation reads as:

"As events enter a queue, they are buffered in a queue, until either
queue.time or batch.size is reached. A background thread
(kafka.producer.async.ProducerSendThread) dequeues the batch of data and
lets the kafka.producer.EventHandler serialize and send the data to the
appropriate kafka broker partition."

 

It clearly seems yes. But isn't it only for the old producer? Because
queue.time is not part of Producer configuration options as defined in the
same documentation. I can't find it in ProducerConfig source
<https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache
/kafka/clients/producer/ProducerConfig.java> . Looking into source code
there is deprecated ProducerSendThread
<https://github.com/apache/kafka/blob/128d0ff91d84a3a1f5a5237133f9ec01caf18d
66/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala>  which
uses queueTime. But at first glance I can't see where it is used in the new
DefaultEventHandler
<https://github.com/apache/kafka/blob/128d0ff91d84a3a1f5a5237133f9ec01caf18d
66/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala>  or
am I looking at the wrong place? The Producer
<https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/produc
er/Producer.scala>  asyncSend seems to use only queueEnqueueTimeoutMs.


I'm trying to figure out if sending can be blocked by queue.time or
batch.size in the new producer. I thought that it is not and that sending
loop dequeues as fast as possible, creates batches of max batch.size from
what is currently there. And how fast it is dequed is limited only by linger
and max.in.flight.requests.per.connection. Or it still waits for either
queue.time to batch.size to fill? I got confused by the documentation which
suggests otherwise.

I can't wrap my head around how linger would help if Producer waits for
queue.time or batch.size. What can I control with linger I can't control
with the combination of other two in this case. And how they all play
together? (If all still applies)

Many thanks for any clarification,
Petr