You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/07 10:22:18 UTC

[GitHub] [kafka] g1geordie opened a new pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

g1geordie opened a new pull request #9707:
URL: https://github.com/apache/kafka/pull/9707


   KafkaProducer.flush method in callback will cause deadlock .
   
   because flush method  wait the future complete
   ``` java
   //  KafkaProducer
   public void flush() {
      .....
           try {
               this.accumulator.awaitFlushCompletion();
           } catch (InterruptedException e) {
               throw new InterruptException("Flush interrupted.", e);
           }
   }
   
   //RecordAccumulator
       public void awaitFlushCompletion() throws InterruptedException {
           try {
               for (ProducerBatch batch : this.incomplete.copyAll())
                   batch.produceFuture.await();
           } finally {
               this.flushesInProgress.decrementAndGet();
           }
       }
   ```
   
   but  future complete after the callback .
   ```java
   //ProducerBatch
    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        ...
           // execute callbacks
           for (Thunk thunk : thunks) {
               try {
                   if (exception == null) {
                       RecordMetadata metadata = thunk.future.value();
                       if (thunk.callback != null)
                           thunk.callback.onCompletion(metadata, null);
                   } else {
                       if (thunk.callback != null)
                           thunk.callback.onCompletion(null, exception);
                   }
               } catch (Exception e) {
                   log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
               }
           }
   
           produceFuture.done();
    }
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r537998894



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -391,6 +391,23 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
         };
     }
 
+    @Test(timeout = 10000)
+    public void testFlushInCallbackNotCauseDeadlock() throws InterruptedException {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, onePartitionCluster);
+
+        KafkaProducer<String, String> producer = producerWithOverrideNewSender(configs, metadata);
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
+
+        producer.send(record, (metadata1, exception) -> producer.flush());

Review comment:
       Could you add a flag after ```producer.flush()```? We should make sure ```producer.flush()``` fails.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
g1geordie commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r538239661



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -391,6 +391,23 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
         };
     }
 
+    @Test(timeout = 10000)
+    public void testFlushInCallbackNotCauseDeadlock() throws InterruptedException {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, onePartitionCluster);
+
+        KafkaProducer<String, String> producer = producerWithOverrideNewSender(configs, metadata);
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
+
+        producer.send(record, (metadata1, exception) -> producer.flush());

Review comment:
       Thank you  . I add a Assert.fail after flush 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#issuecomment-743714552


   @hachikuji FYI


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#issuecomment-739827572


   @chia7712 Can you help me review?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r537998495



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1115,6 +1115,10 @@ private void ensureValidRecordSize(int size) {
      */
     @Override
     public void flush() {
+        boolean invokedFromCallback = Thread.currentThread() == this.ioThread;

Review comment:
       As this check is used by ```close``` also, could you add a helper method to return boolean?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
g1geordie commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r540045129



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -131,6 +131,14 @@
             Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
             Collections.emptySet(),
             Collections.emptySet());
+
+    private final Cluster oneLeaderCluster = new Cluster(

Review comment:
       you are so nice !




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r539920047



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -131,6 +131,14 @@
             Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
             Collections.emptySet(),
             Collections.emptySet());
+
+    private final Cluster oneLeaderCluster = new Cluster(

Review comment:
       Thanks for your explanation. ```oneLeaderCluster``` is used by this test case only. Could you make it be a local variable? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
g1geordie commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r538507226



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -391,6 +391,27 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
         };
     }
 
+    @Test(timeout = 10000)
+    public void testFlushInCallbackNotCauseDeadlock() throws InterruptedException {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, onePartitionCluster);
+
+        KafkaProducer<String, String> producer = producerWithOverrideNewSender(configs, metadata);
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
+
+        producer.send(record, (metadata1, exception) -> {
+            producer.flush();
+            Assert.fail("Flush method needs to throw exception.");

Review comment:
       move assert to main thread 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1079,6 +1079,13 @@ private void ensureValidRecordSize(int size) {
                     " configuration.");
     }
 
+    /**
+     * @return is called in callback.

Review comment:
       remove 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1201,7 +1211,7 @@ private void close(Duration timeout, boolean swallowException) {
 
         // this will keep track of the first encountered exception
         AtomicReference<Throwable> firstException = new AtomicReference<>();
-        boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
+        boolean invokedFromCallback = isInvokedFromCallback();

Review comment:
       direct to method 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r539835667



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -131,6 +131,14 @@
             Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
             Collections.emptySet(),
             Collections.emptySet());
+
+    private final Cluster oneLeaderCluster = new Cluster(

Review comment:
       Pardon me, why this change is required.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#issuecomment-747188758


   @ijuma hello 
   
   The close method want to prevent self-join but there are no deadlock  . 
   
   The flush method  in callback has a deadlock.
   The deadlock is because  `flush` wait the `sender` send all message .
   but messages are done after callback complete .
   **flush in callback  , callback will never complete .**
   
   `flush`'s semanteme is send message . 
   so wait all message send I think it's necessary .
   
   Messages are done  after callback I think is option ,
   Or we can make async to `callback`  ,then messages will done no matter `callback` is complete. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r538242938



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1201,7 +1211,7 @@ private void close(Duration timeout, boolean swallowException) {
 
         // this will keep track of the first encountered exception
         AtomicReference<Throwable> firstException = new AtomicReference<>();
-        boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
+        boolean invokedFromCallback = isInvokedFromCallback();

Review comment:
       this local variable is redundant
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1079,6 +1079,13 @@ private void ensureValidRecordSize(int size) {
                     " configuration.");
     }
 
+    /**
+     * @return is called in callback.

Review comment:
       I feel the method name is good to be document.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -391,6 +391,27 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
         };
     }
 
+    @Test(timeout = 10000)
+    public void testFlushInCallbackNotCauseDeadlock() throws InterruptedException {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, onePartitionCluster);
+
+        KafkaProducer<String, String> producer = producerWithOverrideNewSender(configs, metadata);
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
+
+        producer.send(record, (metadata1, exception) -> {
+            producer.flush();
+            Assert.fail("Flush method needs to throw exception.");

Review comment:
       I don't think this is valid since this callback is triggered by another thread. The assert failure can't be aware by test thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
g1geordie commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r539879001



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -131,6 +131,14 @@
             Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
             Collections.emptySet(),
             Collections.emptySet());
+
+    private final Cluster oneLeaderCluster = new Cluster(

Review comment:
       if the message is not sent  , the thread will have a infinite loop
   
   ```
   #Sender 
   while(  ... .  this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0  ...)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r538511924



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -391,6 +391,31 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
         };
     }
 
+    @Test(timeout = 10000)
+    public void testFlushInCallbackNotCauseDeadlock() throws InterruptedException {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, onePartitionCluster);
+
+        KafkaProducer<String, String> producer = producerWithOverrideNewSender(configs, metadata);
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
+
+        AtomicBoolean  exceptionThrow = new AtomicBoolean(false);

Review comment:
       Could you save the exception caused by ```producer.flush()``` and then check the error type in the test thread?

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1115,6 +1119,9 @@ private void ensureValidRecordSize(int size) {
      */
     @Override
     public void flush() {
+        if (isInvokedFromCallback())
+            throw new UnsupportedOperationException("Can't specify flush method in callback.");

Review comment:
       It seems to me ```IllegalStateException``` is more suitable for this case. Also, please update docs of ```flush()```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#issuecomment-745304984


   Have we considered whether we could do something similar to `close` instead of throwing an exception? Not saying that's a good idea, just want to make sure we thought through the options.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r538983484



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1116,11 +1116,12 @@ private boolean isInvokedFromCallback() {
      * </p>
      *
      * @throws InterruptException If the thread is interrupted while blocked
+     * @throws IllegalStateException If flush method in callback

Review comment:
       ```If the thread is callback thread. Calling flush() from callback is not allowed since it makes deadlock.```

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1116,11 +1116,12 @@ private boolean isInvokedFromCallback() {
      * </p>
      *
      * @throws InterruptException If the thread is interrupted while blocked
+     * @throws IllegalStateException If flush method in callback
      */
     @Override
     public void flush() {
         if (isInvokedFromCallback())
-            throw new UnsupportedOperationException("Can't specify flush method in callback.");
+            throw new IllegalStateException("Can't specify flush method in callback.");

Review comment:
       ```Calling flush() from callback is not allowed since it makes deadlock.```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#issuecomment-741968372


   @chia7712  thanks again 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
g1geordie commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r539869906



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -131,6 +131,14 @@
             Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
             Collections.emptySet(),
             Collections.emptySet());
+
+    private final Cluster oneLeaderCluster = new Cluster(

Review comment:
       producer.close(Duration.of(0))  will force close without sending all message . 
   producer.close()  will gracefully close after sending all message .
   
   the callback method will be invoke after  one of above  does
   
   when we want to  send message  , we need to have
   -  A Cluster has a leader partition 
   -  Mockclient  prepare a fake response
   
   So I add a cluster with one leader  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#issuecomment-756071279


   Personally, the callback is a bit weird to me. The callback mechanism makes “internal” thread to handle user-defined code. The user code to handle response/error is vary different from serialization so it seems to me that callback causes some problems.
   
   1. one thread’s callback can hurt other threads when we share a prouder to different threads.
   2. bad performance due to “single” thread
   3. can’t isolate the internal thread from user
   4. callback has some cryptic limits (for instance, this issue and #9842)
   
   I prefer the Future that user can handle it by user-defined thread. Also, it  does not have limits like callback and user code can be isolated from internal thread. For another, we should consider CompletabeFuture as we are already in JDK 8.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#discussion_r538514230



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -391,6 +391,31 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
         };
     }
 
+    @Test(timeout = 10000)
+    public void testFlushInCallbackNotCauseDeadlock() throws InterruptedException {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, onePartitionCluster);
+
+        KafkaProducer<String, String> producer = producerWithOverrideNewSender(configs, metadata);

Review comment:
       Could you use try block?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org