You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Rajini Sivaram (Jira)" <ji...@apache.org> on 2020/01/07 11:32:00 UTC

[jira] [Resolved] (KAFKA-8135) Kafka Producer deadlocked on flush call with intermittent broker unavailability

     [ https://issues.apache.org/jira/browse/KAFKA-8135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rajini Sivaram resolved KAFKA-8135.
-----------------------------------
    Resolution: Duplicate

> Kafka Producer deadlocked on flush call with intermittent broker unavailability
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-8135
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8135
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.1.0
>            Reporter: Guozhang Wang
>            Assignee: Rajini Sivaram
>            Priority: Major
>
> In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, and the value is default to 2 minutes. We've observed that when it was set to MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the {{broker.flush}} call would be blocked during the time when its destination brokers are undergoing some unavailability:
> {code}
> java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method)
>     - parking to wait for  <0x00000006aeb21a00> (a java.util.concurrent.CountDownLatch$Sync)
>     at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown Source)
>     at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown Source)
>     at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown Source)
>     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown Source)
>     at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown Source)
>     at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>     at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
>     at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
>     at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
>     at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
>     at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
>     at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
>     at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
>     at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
>     at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
>     at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
>     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> And even after the broker went back to normal, producers would still be blocked. One suspicion is that when broker's not able to handle the request in time, the responses are dropped somehow inside the Sender, and hence whoever waiting on this response would be blocked forever.
> We've observed such scenarios when 1) broker's transiently failed for a while, 2) network partitioned transiently, and 3) broker's bad config like ACL caused it to not be able to handle requests for a while.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)