You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "radai rosenblatt (Jira)" <ji...@apache.org> on 2020/05/14 20:04:00 UTC
[jira] [Created] (KAFKA-9998) KafkaProducer.close(timeout) still
may block indefinitely
radai rosenblatt created KAFKA-9998:
---------------------------------------
Summary: KafkaProducer.close(timeout) still may block indefinitely
Key: KAFKA-9998
URL: https://issues.apache.org/jira/browse/KAFKA-9998
Project: Kafka
Issue Type: Bug
Affects Versions: 2.4.1
Reporter: radai rosenblatt
looking at KafkaProducer.close(timeout), we have this:
{code:java}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
// this will keep track of the first encountered exception
AtomicReference<Throwable> firstException = new AtomicReference<>();
boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
if (timeoutMs > 0) {
if (invokedFromCallback) {
log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " +
"This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.",
timeoutMs);
} else {
// Try to close gracefully.
if (this.sender != null)
this.sender.initiateClose();
if (this.ioThread != null) {
try {
this.ioThread.join(timeoutMs); <---- GRACEFUL JOIN
} catch (InterruptedException t) {
firstException.compareAndSet(null, new InterruptException(t));
log.error("Interrupted while joining ioThread", t);
}
}
}
}
if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
log.info("Proceeding to force close the producer since pending requests could not be completed " +
"within timeout {} ms.", timeoutMs);
this.sender.forceClose();
// Only join the sender thread when not calling from callback.
if (!invokedFromCallback) {
try {
this.ioThread.join(); <----- UNBOUNDED JOIN
} catch (InterruptedException e) {
firstException.compareAndSet(null, new InterruptException(e));
}
}
}
...
}
{code}
specifically in our case the ioThread was running a (very) long running user-provided callback which was preventing the producer from closing within the given timeout.
I think the 2nd join() call should either be _VERY_ short (since we're already past the timeout at that stage) ir should not happen at all.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)