You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "radai rosenblatt (Jira)" <ji...@apache.org> on 2020/05/19 16:24:00 UTC

[jira] [Comment Edited] (KAFKA-9998) KafkaProducer.close(timeout) still may block indefinitely

    [ https://issues.apache.org/jira/browse/KAFKA-9998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111317#comment-17111317 ] 

radai rosenblatt edited comment on KAFKA-9998 at 5/19/20, 4:23 PM:
-------------------------------------------------------------------

i think thats wrong, for several reasons:
 # the io thread will (eventually) terminate because this.sender.forceClose() has been called - so no resource leak. just a delay in freeing resources.
 # just like schedulers/executor service classes in the JDK, there's no need to make close() block until all resources have been released
 # the code already avoids joining if called from a callback handler, further proving that there's no potential leak risk.
 # this violates the caller thread's timeout argument, where elsewhere the close method seems to honor it - thats just inconsistent API behavior - either the producer respects timeout or it does not. "sometimes" is hard to explain to users.

seems to me this part was jjust forgotten when timeout support was added


was (Author: radai):
i think thats wrong, for several reasons:
 # the io thread will (eventually) terminate because this.sender.initiateClose() has been called - so no resource leak. just a delay in freeing resources.
 # just like schedulers/executor service classes in the JDK, there's no need to make close() block until all resources have been released
 # this violates the caller thread's timeout argument, where elsewhere the close method seems to honor it - thats just inconsistent API behavior - either the producer respects timeout or it does not. "sometimes" is hard to explain to users.

seems to me this part was jjust forgotten when timeout support was added

> KafkaProducer.close(timeout) still may block indefinitely
> ---------------------------------------------------------
>
>                 Key: KAFKA-9998
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9998
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.1
>            Reporter: radai rosenblatt
>            Priority: Major
>
> looking at KafkaProducer.close(timeout), we have this:
> {code:java}
> private void close(Duration timeout, boolean swallowException) {
>     long timeoutMs = timeout.toMillis();
>     if (timeoutMs < 0)
>         throw new IllegalArgumentException("The timeout cannot be negative.");
>     log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
>     // this will keep track of the first encountered exception
>     AtomicReference<Throwable> firstException = new AtomicReference<>();
>     boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
>     if (timeoutMs > 0) {
>         if (invokedFromCallback) {
>             log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " +
>                     "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.",
>                     timeoutMs);
>         } else {
>             // Try to close gracefully.
>             if (this.sender != null)
>                 this.sender.initiateClose();
>             if (this.ioThread != null) {
>                 try {
>                     this.ioThread.join(timeoutMs);    <---- GRACEFUL JOIN
>                 } catch (InterruptedException t) {
>                     firstException.compareAndSet(null, new InterruptException(t));
>                     log.error("Interrupted while joining ioThread", t);
>                 }
>             }
>         }
>     }
>     if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
>         log.info("Proceeding to force close the producer since pending requests could not be completed " +
>                 "within timeout {} ms.", timeoutMs);
>         this.sender.forceClose();
>         // Only join the sender thread when not calling from callback.
>         if (!invokedFromCallback) {
>             try {
>                 this.ioThread.join();   <----- UNBOUNDED JOIN
>             } catch (InterruptedException e) {
>                 firstException.compareAndSet(null, new InterruptException(e));
>             }
>         }
>     }
> ...
> }
> {code}
> specifically in our case the ioThread was running a (very) long running user-provided callback which was preventing the producer from closing within the given timeout.
>  
> I think the 2nd join() call should either be _VERY_ short (since we're already past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)