You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Tim Fox (Jira)" <ji...@apache.org> on 2020/11/19 11:10:00 UTC

[jira] [Comment Edited] (KAFKA-10114) Kafka producer stuck after broker crash

    [ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343 ] 

Tim Fox edited comment on KAFKA-10114 at 11/19/20, 11:09 AM:
-------------------------------------------------------------

> We are still seeing this issue with version 2.6.0. Our app calls flush and it hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the current code]

Currently KafkaProducer.flush() will hang forever if there are queued batches and brokers are lost and not restarted. Queued batches won't be timed out as there are no "ready" nodes and the timeout logic currently occurs after a ready node has been obtained.

Expectation is for flush() to complete with a TimeoutException if it does not complete successfully before the timeout as specified in delivery.timeout.ms

 

[~ijuma] [~hachikuji] thoughts?

 

 

 


was (Author: purplefox):
> We are still seeing this issue with version 2.6.0. Our app calls flush and it hangs forever when brokers are down.

The current KafkaProducer.flush() method will indeed wait for ever for flush() to complete. Flush clearly cannot complete if brokers are down. This seems like a reasonable default to me - we want to be sure that buffered messages aren't lost, yet we don't know how long it will take for brokers to be restarted, so it's very hard to choose a default timeout - should it be 1 minute? I hour? I day?

However, without changing the API, perhaps we could allow for a flush timeout to be specified via a producer property? That way we could keep the default as "forever" but allow you to override it to a lower value.

[~ijuma] [~hachikuji] thoughts?

 

 

 

> Kafka producer stuck after broker crash
> ---------------------------------------
>
>                 Key: KAFKA-10114
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10114
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.3.1, 2.4.1
>            Reporter: Itamar Benjamin
>            Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers were not able to send new messages. After brokers started again all producers resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for incoming-mutable-RuntimeIIL-1:120000 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 cpu=63594017.16ms elapsed=1511219.38s tid=0x00007fffd8353000 nid=0x4fa4 sleeping [0x00007ff55c177000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
>         at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
>         at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
>         at org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
>         at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
>         at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>         at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked ownable synchronizers:
>         - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready (awaitNodeReady function) which in return calls leastLoadedNode() on NetworkClient. This one iterates over all brokers and checks if a request can be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for all nodes a new request to check broker availability and reconnect to it cannot be submitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)