You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Lucas Bradstreet (Jira)" <ji...@apache.org> on 2020/01/11 23:45:00 UTC
[jira] [Created] (KAFKA-9401) High lock contention for
kafka.server.FetchManager.newContext
Lucas Bradstreet created KAFKA-9401:
---------------------------------------
Summary: High lock contention for kafka.server.FetchManager.newContext
Key: KAFKA-9401
URL: https://issues.apache.org/jira/browse/KAFKA-9401
Project: Kafka
Issue Type: Improvement
Components: core
Reporter: Lucas Bradstreet
kafka.server.FetchManager.newContext takes out what is essentially a global fetch lock on kafka.server.FetchSessionCache, for updates to not only the FetchSessionCache but the also update the fetch sessions stored with in it. This causes a high amount of lock contention for fetches, as every fetch request must go through this lock.
I have taken an async-profiler lock profile on a high throughput cluster, and I see around 25s of waiting on this lock for a sixty second profile.
--- 25818577497 ns (20.84%), 5805 samples
[ 0] kafka.server.FetchSessionCache
[ 1] kafka.server.FetchManager.newContext
[ 2] kafka.server.KafkaApis.handleFetchRequest
[ 3] kafka.server.KafkaApis.handle
[ 4] kafka.server.KafkaRequestHandler.run
[ 5] java.lang.Thread.run
{code:java}
cache.synchronized {
cache.get(reqMetadata.sessionId) match {
case None => {
debug(s"Session error for ${reqMetadata.sessionId}: no such session ID found.")
new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata)
}
case Some(session) => session.synchronized {
if (session.epoch != reqMetadata.epoch) {
debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " +
s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, reqMetadata)
} else {
val (added, updated, removed) = session.update(fetchData, toForget, reqMetadata)
if (session.isEmpty) {
debug(s"Created a new sessionless FetchContext and closing session id ${session.id}, " +
s"epoch ${session.epoch}: after removing ${partitionsToLogString(removed)}, " +
s"there are no more partitions left.")
cache.remove(session)
new SessionlessFetchContext(fetchData)
} else {
cache.touch(session, time.milliseconds())
session.epoch = JFetchMetadata.nextEpoch(session.epoch)
debug(s"Created a new incremental FetchContext for session id ${session.id}, " +
s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " +
s"updated ${partitionsToLogString(updated)}, " +
s"removed ${partitionsToLogString(removed)}")
new IncrementalFetchContext(time, reqMetadata, session)
}
}
}
}
}
{code}
Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect FetchSessionCache eviction logic" ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly touched now, whereas previously the touch was being skipped.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)