You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Nick Travers (JIRA)" <ji...@apache.org> on 2018/02/06 18:05:00 UTC

[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

    [ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354259#comment-16354259 ] 

Nick Travers commented on KAFKA-4669:
-------------------------------------

Chiming in again to note that we're still running into this issue intermittently. The failure mode is the same, with a BufferUnderflowException and stack trace similar to what I posted above.

For some additional context, when this occurs it ultimately leads to a JVM that cannot exit as it is waiting on a latch that will never be closed. Here's the hung thread
{code:java}
"async-message-sender-0" #120 daemon prio=5 os_prio=0 tid=0x00007f30b4003000 nid=0x195a1 waiting on condition [0x00007f3105ce1000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@9/Native Method)
	- parking to wait for  <0x00000007852b1b68> (a java.util.concurrent.CountDownLatch$Sync)
	at java.util.concurrent.locks.LockSupport.park(java.base@9/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@9/AbstractQueuedSynchronizer.java:871)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1024)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1331)
	at java.util.concurrent.CountDownLatch.await(java.base@9/CountDownLatch.java:232)
	at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
	at com.squareup.core.concurrent.Futures2.getAll(Futures2.java:462)
	at com.squareup.kafka.ng.producer.KafkaProducer.sendMessageBatch(KafkaProducer.java:214)
	- locked <0x0000000728c71998> (a com.squareup.kafka.ng.producer.KafkaProducer)
	at com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.sendBatch(BufferedKafkaProducer.java:585)
	at com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.run(BufferedKafkaProducer.java:536)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@9/ThreadPoolExecutor.java:1167)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@9/ThreadPoolExecutor.java:641)
	at java.lang.Thread.run(java.base@9/Thread.java:844)
{code}
[Here is the latch|https://github.com/apache/kafka/blob/0.11.0.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java#L34] that is still open in the ProduceRequestResult. I assume that the network thread is responsible for closing that, but if that thread crashes for whatever reason, it never gets a chance to callCountDownLatch#countDown.

Arguably, we should probably be using a combination of daemon threads, and the timed version of Future#get, but it _feels_ like something that could be fixed in the producer client, even if it's just for the sake of ensuring that failed ProduceRequestResults can be GC'd eventually, which can't happen if another thread is hung waiting on the latch.

cc: [~rsivaram] [~hachikuji]

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4669
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4669
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1
>            Reporter: Cheng Ju
>            Assignee: Rajini Sivaram
>            Priority: Critical
>              Labels: reliability
>             Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an exception is thrown after inFlightRequests.completeNext(source), then the corresponding RecordBatch's done will never get called, and KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does not match request (703764)
> 	at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
> 	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> 	at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
> 	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> 	at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
> 	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
> 	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
> 	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
> 	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> 	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> 	at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)