You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "GeordieMai (Jira)" <ji...@apache.org> on 2020/12/04 19:24:00 UTC

[jira] [Comment Edited] (KAFKA-10790) Detect/Prevent Deadlock on Producer Network Thread

    [ https://issues.apache.org/jira/browse/KAFKA-10790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17244252#comment-17244252 ] 

GeordieMai edited comment on KAFKA-10790 at 12/4/20, 7:23 PM:
--------------------------------------------------------------

[~chia7712] Can I take this issue?
 when flush method is called in  callback , 

throw a exception to notify user to prevent  it

or just make flush method not working 

or make flush method work fine

what do you think ?


was (Author: geordie):
[~chia7712] Can I take this issue?
when flush method is called in  callback , 

throw a exception to notify user to prevent  it

or just make flush method not working 

or make flush method work fun 

what do you think ?

> Detect/Prevent Deadlock on Producer Network Thread
> --------------------------------------------------
>
>                 Key: KAFKA-10790
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10790
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.6.0, 2.7.0
>            Reporter: Gary Russell
>            Priority: Major
>
> I realize this is contrived, but I stumbled across the problem while testing some library code with 2.7.0 RC3 (although the issue is not limited to 2.7).
> For example, calling flush() on the producer callback deadlocks the network thread (and any attempt to close the producer thereafter).
> {code:java}
> producer.send(new ProducerRecord("foo", "bar"), (rm, ex) -> {
> 	producer.flush();
> });
> Thread.sleep(1000);
> producer.close();
> {code}
> It took some time to figure out why the close was blocked.
> There is existing logic in close() to avoid it blocking if called from the callback; perhaps similar logic could be added to flush() (and any other methods that might block), even if it means throwing an exception to make it clear that you can't call flush() from the callback. 
> These stack traces are with the 2.6.0 client.
> {noformat}
> "main" #1 prio=5 os_prio=31 cpu=1333.10ms elapsed=13.05s tid=0x00007ff259012800 nid=0x2803 in Object.wait()  [0x000070000fda5000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> 	at java.lang.Object.wait(java.base@14.0.2/Native Method)
> 	- waiting on <0x0000000700d00000> (a org.apache.kafka.common.utils.KafkaThread)
> 	at java.lang.Thread.join(java.base@14.0.2/Thread.java:1297)
> 	- locked <0x0000000700d00000> (a org.apache.kafka.common.utils.KafkaThread)
> 	at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1205)
> 	at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1182)
> 	at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1158)
> 	at com.example.demo.Rk1Application.lambda$2(Rk1Application.java:55)
> "kafka-producer-network-thread | producer-1" #24 daemon prio=5 os_prio=31 cpu=225.80ms elapsed=11.64s tid=0x00007ff256963000 nid=0x7103 waiting on condition  [0x0000700011d04000]
>    java.lang.Thread.State: WAITING (parking)
> 	at jdk.internal.misc.Unsafe.park(java.base@14.0.2/Native Method)
> 	- parking to wait for  <0x00000007020b27e0> (a java.util.concurrent.CountDownLatch$Sync)
> 	at java.util.concurrent.locks.LockSupport.park(java.base@14.0.2/LockSupport.java:211)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@14.0.2/AbstractQueuedSynchronizer.java:714)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@14.0.2/AbstractQueuedSynchronizer.java:1046)
> 	at java.util.concurrent.CountDownLatch.await(java.base@14.0.2/CountDownLatch.java:232)
> 	at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
> 	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:712)
> 	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1111)
> 	at com.example.demo.Rk1Application.lambda$3(Rk1Application.java:52)
> 	at com.example.demo.Rk1Application$$Lambda$528/0x0000000800e28840.onCompletion(Unknown Source)
> 	at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
> 	at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:228)
> 	at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
> 	at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:653)
> 	at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
> 	at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:554)
> 	at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$0(Sender.java:743)
> 	at org.apache.kafka.clients.producer.internals.Sender$$Lambda$642/0x0000000800ea2040.onComplete(Unknown Source)
> 	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> 	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:566)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558)
> 	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> 	at java.lang.Thread.run(java.base@14.0.2/Thread.java:832)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)