You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Michal Turek <tu...@avast.com> on 2016/09/09 15:06:30 UTC

Asynchronous non-blocking Kafka producer without loosing messages

Hi there,

We are preparing update of our Kafka cluster and applications to Kafka 
0.10.x and we have some difficulties with configuration of *Kafka 
producer to be asynchronous and reliably non-blocking*.

As I understand KIP-19 (1), the main intention of Kafka developers was 
to hard-limit how long time the producer may block (max.block.ms) 
instead of making the producer fully non-blocking as one expects from 
the asynchronous API with futures and callbacks.

The issue with max.block.ms is that value 0 will cause immediate 
TimeoutException if topics metadata are not available instead of 
asynchronous waiting for them followed by completion of the earlier 
returned future. TimeoutException is typical for the first requests 
after start, but probably (not checked) also for add/remove 
topic/partition, broker restarts, etc. Any other value than 0 isn't 
usable for the non-blocking requirement. What to choose? 1 minute 
(default), 1 second, 100 ms, 10 ms - only 0 would be right.

I found multiple Jira tickets that describes this issue (2), some of 
them are closed but none resolved.

My questions are:
- Did I miss something obvious?
- Did anyone of you workaround this issue and how?

I'm considering to implement a workaround to our applications probably 
based on ThreadPoolExecutor.setRejectedExecutionHandler() (3) or 
BlockingQueue.offer() (4) but I wanted to ask here before to find and 
reuse an existing solution.

Thanks,
Michal

(1) 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
(2) https://issues.apache.org/jira/browse/KAFKA-1835, 
https://issues.apache.org/jira/browse/KAFKA-2137, 
https://issues.apache.org/jira/browse/KAFKA-3236, 
https://issues.apache.org/jira/browse/KAFKA-3450
(3) 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html#setRejectedExecutionHandler-java.util.concurrent.RejectedExecutionHandler-
(4) 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html#offer-E-