You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Kshitij Wadhwa (Jira)" <ji...@apache.org> on 2020/11/18 19:29:00 UTC

[jira] [Commented] (KAFKA-10114) Kafka producer stuck after broker crash

    [ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234954#comment-17234954 ] 

Kshitij Wadhwa commented on KAFKA-10114:
----------------------------------------

We are still seeing this issue with version 2.6.0. Our app calls flush and it hangs forever when brokers are down.

> Kafka producer stuck after broker crash
> ---------------------------------------
>
>                 Key: KAFKA-10114
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10114
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.3.1, 2.4.1
>            Reporter: Itamar Benjamin
>            Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers were not able to send new messages. After brokers started again all producers resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for incoming-mutable-RuntimeIIL-1:120000 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 cpu=63594017.16ms elapsed=1511219.38s tid=0x00007fffd8353000 nid=0x4fa4 sleeping [0x00007ff55c177000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
>         at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
>         at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
>         at org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
>         at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
>         at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>         at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked ownable synchronizers:
>         - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready (awaitNodeReady function) which in return calls leastLoadedNode() on NetworkClient. This one iterates over all brokers and checks if a request can be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for all nodes a new request to check broker availability and reconnect to it cannot be submitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)