You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Nick Travers (JIRA)" <ji...@apache.org> on 2017/11/28 18:56:00 UTC

[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

    [ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269266#comment-16269266 ] 

Nick Travers commented on KAFKA-4669:
-------------------------------------

We hit this again in production, running 0.11.0.1. Same symptoms as previously reported:

{code}
2017-11-28 09:18:20,674 apa158.sjc2b.square kafka-producer-network-thread | producer-2 Uncaught error in kafka producer I/O thread: 
java.lang.IllegalStateException: Correlation id for response (1835513) does not match request (1835503), request header: {api_key=0,api_version=3,correlation_id=1835503,client_id=producer-2}
	at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
	at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
	at java.lang.Thread.run(Thread.java:748)
{code}

A side-effect was that this effectively caused a deadlock in the affected JVM as it had a thread waiting for the completion of a send (which awaits on a latch), but this could never occur as the I/O thread had presumably crashed:

{code}
"async-message-sender-0" #1761 daemon prio=5 os_prio=0 tid=0x00007f3f04006800 nid=0x4356a waiting on condition [0x00007f3de9425000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000075a5e9140> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
        at com.squareup.core.concurrent.Futures2.getAll(Futures2.java:462)
        at com.squareup.kafka.ng.producer.KafkaProducer.sendMessageBatch(KafkaProducer.java:214)
        - locked <0x0000000734fc4dc0> (a com.squareup.kafka.ng.producer.KafkaProducer)
        at com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.sendBatch(BufferedKafkaProducer.java:585)
        at com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.run(BufferedKafkaProducer.java:536)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

This doesn't look like an easy one to reproduce on our side, so I'm wondering what the best course of action is here. Is it worth opening this ticket [~ijuma]?

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4669
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4669
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1
>            Reporter: Cheng Ju
>            Assignee: Rajini Sivaram
>            Priority: Critical
>              Labels: reliability
>             Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an exception is thrown after inFlightRequests.completeNext(source), then the corresponding RecordBatch's done will never get called, and KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does not match request (703764)
> 	at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
> 	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> 	at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
> 	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> 	at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
> 	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
> 	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
> 	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
> 	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> 	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> 	at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)