You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Shannon Carey (JIRA)" <ji...@apache.org> on 2017/05/01 20:37:04 UTC

[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

    [ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991488#comment-15991488 ] 

Shannon Carey commented on KAFKA-4669:
--------------------------------------

We encountered this exception with our 0.9.0.1 client too, but it was preceded by another exception:

{code}
2017-05-01 10:38:02:866 thread=kafka-producer-network-thread | producer-1, level=ERROR, logger=org.apache.kafka.clients.producer.internals.Sender, , message="Uncaught error in kafka producer I/O thread: "org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'brokers': Error reading field 'host': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
        at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]

2017-05-01 10:38:04:847 thread=kafka-producer-network-thread | producer-1, level=ERROR, logger=org.apache.kafka.clients.producer.internals.Sender, , me
ssage="Uncaught error in kafka producer I/O thread: "
java.lang.IllegalStateException: Correlation id for response (3623413) does not match request (3623406)
        at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
        at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
{code}

It seems like we will need to use metrics in order to detect this situation and react by restarting the application, assuming there's no workaround.

The producer should probably be fixed so that any unhandled exceptions that occur in the Kafka internal threads result in an exception thrown to the user code so that user code can react to it instead of silently ceasing to work. Or, if it's a transient problem, the thread should recover.

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4669
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4669
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1
>            Reporter: Cheng Ju
>            Priority: Critical
>              Labels: reliability
>             Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an exception is thrown after inFlightRequests.completeNext(source), then the corresponding RecordBatch's done will never get called, and KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does not match request (703764)
> 	at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
> 	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> 	at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
> 	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> 	at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
> 	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
> 	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
> 	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
> 	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> 	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> 	at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)