You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Mayur Patki (Jira)" <ji...@apache.org> on 2020/10/21 10:09:00 UTC

[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

    [ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218213#comment-17218213 ] 

Mayur Patki commented on KAFKA-4669:
------------------------------------

Hi Team, 

Encountering this issue on consumers as well - is there a workaround for this? 

spring-kafka consumer used

 

spring-kafka version - 2.2.14.RELEASE
kafka client version - kafka-client - 2.0.1

 

kafka cluster running on 1.1.1
Consumer exception - cause: {} - java.lang.IllegalStateException: Correlation id for response (400801) does not match request (400737), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=3, clientId=consumer-7, correlationId=400737)
	at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:853)
	at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:638)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:757)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:519)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1247)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:742)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:748)

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4669
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4669
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1
>            Reporter: Cheng Ju
>            Assignee: Rajini Sivaram
>            Priority: Critical
>              Labels: reliability
>             Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an exception is thrown after inFlightRequests.completeNext(source), then the corresponding RecordBatch's done will never get called, and KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does not match request (703764)
> 	at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
> 	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> 	at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
> 	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> 	at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
> 	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
> 	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
> 	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
> 	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> 	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> 	at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)