You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Lucas Bradstreet (Jira)" <ji...@apache.org> on 2020/01/11 23:46:00 UTC

[jira] [Updated] (KAFKA-9401) High lock contention for kafka.server.FetchManager.newContext

     [ https://issues.apache.org/jira/browse/KAFKA-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lucas Bradstreet updated KAFKA-9401:
------------------------------------
    Description: 
kafka.server.FetchManager.newContext takes out what is essentially a global fetch lock on kafka.server.FetchSessionCache, for updates to not only the FetchSessionCache but the also update the fetch sessions stored with in it. This causes a high amount of lock contention for fetches, as every fetch request must go through this lock.

I have taken an async-profiler lock profile on a high throughput cluster, and I see around 25s of waiting on this lock for a sixty second profile.
{noformat}
*— 25818577497 ns (20.84%), 5805 samples
 [ 0] kafka.server.FetchSessionCache
 [ 1] kafka.server.FetchManager.newContext
 [ 2] kafka.server.KafkaApis.handleFetchRequest
 [ 3] kafka.server.KafkaApis.handle
 [ 4] kafka.server.KafkaRequestHandler.run
 [ 5] java.lang.Thread.run
 {noformat}
FetchSession.scala:
{code:java}
  cache.synchronized {
    cache.get(reqMetadata.sessionId) match {
      case None => {
        debug(s"Session error for ${reqMetadata.sessionId}: no such session ID found.")
        new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata)
      }
      case Some(session) => session.synchronized {
        if (session.epoch != reqMetadata.epoch) {
          debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " +
            s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
          new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, reqMetadata)
        } else {
          val (added, updated, removed) = session.update(fetchData, toForget, reqMetadata)
          if (session.isEmpty) {
            debug(s"Created a new sessionless FetchContext and closing session id ${session.id}, " +
              s"epoch ${session.epoch}: after removing ${partitionsToLogString(removed)}, " +
              s"there are no more partitions left.")
            cache.remove(session)
            new SessionlessFetchContext(fetchData)
          } else {
            cache.touch(session, time.milliseconds())
            session.epoch = JFetchMetadata.nextEpoch(session.epoch)
            debug(s"Created a new incremental FetchContext for session id ${session.id}, " +
              s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " +
              s"updated ${partitionsToLogString(updated)}, " +
              s"removed ${partitionsToLogString(removed)}")
            new IncrementalFetchContext(time, reqMetadata, session)
          }
        }
      }
    }
  }

{code}
Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect FetchSessionCache eviction logic" ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly touched now, whereas previously the touch was being skipped.

 

  was:
kafka.server.FetchManager.newContext takes out what is essentially a global fetch lock on kafka.server.FetchSessionCache, for updates to not only the FetchSessionCache but the also update the fetch sessions stored with in it. This causes a high amount of lock contention for fetches, as every fetch request must go through this lock.

I have taken an async-profiler lock profile on a high throughput cluster, and I see around 25s of waiting on this lock for a sixty second profile.
--- 25818577497 ns (20.84%), 5805 samples
  [ 0] kafka.server.FetchSessionCache
  [ 1] kafka.server.FetchManager.newContext
  [ 2] kafka.server.KafkaApis.handleFetchRequest
  [ 3] kafka.server.KafkaApis.handle
  [ 4] kafka.server.KafkaRequestHandler.run
  [ 5] java.lang.Thread.run



 
{code:java}

  cache.synchronized {
    cache.get(reqMetadata.sessionId) match {
      case None => {
        debug(s"Session error for ${reqMetadata.sessionId}: no such session ID found.")
        new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata)
      }
      case Some(session) => session.synchronized {
        if (session.epoch != reqMetadata.epoch) {
          debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " +
            s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
          new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, reqMetadata)
        } else {
          val (added, updated, removed) = session.update(fetchData, toForget, reqMetadata)
          if (session.isEmpty) {
            debug(s"Created a new sessionless FetchContext and closing session id ${session.id}, " +
              s"epoch ${session.epoch}: after removing ${partitionsToLogString(removed)}, " +
              s"there are no more partitions left.")
            cache.remove(session)
            new SessionlessFetchContext(fetchData)
          } else {
            cache.touch(session, time.milliseconds())
            session.epoch = JFetchMetadata.nextEpoch(session.epoch)
            debug(s"Created a new incremental FetchContext for session id ${session.id}, " +
              s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " +
              s"updated ${partitionsToLogString(updated)}, " +
              s"removed ${partitionsToLogString(removed)}")
            new IncrementalFetchContext(time, reqMetadata, session)
          }
        }
      }
    }
  }

{code}
Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect FetchSessionCache eviction logic" ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly touched now, whereas previously the touch was being skipped.

 


> High lock contention for kafka.server.FetchManager.newContext
> -------------------------------------------------------------
>
>                 Key: KAFKA-9401
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9401
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Lucas Bradstreet
>            Priority: Major
>
> kafka.server.FetchManager.newContext takes out what is essentially a global fetch lock on kafka.server.FetchSessionCache, for updates to not only the FetchSessionCache but the also update the fetch sessions stored with in it. This causes a high amount of lock contention for fetches, as every fetch request must go through this lock.
> I have taken an async-profiler lock profile on a high throughput cluster, and I see around 25s of waiting on this lock for a sixty second profile.
> {noformat}
> *— 25818577497 ns (20.84%), 5805 samples
>  [ 0] kafka.server.FetchSessionCache
>  [ 1] kafka.server.FetchManager.newContext
>  [ 2] kafka.server.KafkaApis.handleFetchRequest
>  [ 3] kafka.server.KafkaApis.handle
>  [ 4] kafka.server.KafkaRequestHandler.run
>  [ 5] java.lang.Thread.run
>  {noformat}
> FetchSession.scala:
> {code:java}
>   cache.synchronized {
>     cache.get(reqMetadata.sessionId) match {
>       case None => {
>         debug(s"Session error for ${reqMetadata.sessionId}: no such session ID found.")
>         new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata)
>       }
>       case Some(session) => session.synchronized {
>         if (session.epoch != reqMetadata.epoch) {
>           debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " +
>             s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
>           new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, reqMetadata)
>         } else {
>           val (added, updated, removed) = session.update(fetchData, toForget, reqMetadata)
>           if (session.isEmpty) {
>             debug(s"Created a new sessionless FetchContext and closing session id ${session.id}, " +
>               s"epoch ${session.epoch}: after removing ${partitionsToLogString(removed)}, " +
>               s"there are no more partitions left.")
>             cache.remove(session)
>             new SessionlessFetchContext(fetchData)
>           } else {
>             cache.touch(session, time.milliseconds())
>             session.epoch = JFetchMetadata.nextEpoch(session.epoch)
>             debug(s"Created a new incremental FetchContext for session id ${session.id}, " +
>               s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " +
>               s"updated ${partitionsToLogString(updated)}, " +
>               s"removed ${partitionsToLogString(removed)}")
>             new IncrementalFetchContext(time, reqMetadata, session)
>           }
>         }
>       }
>     }
>   }
> {code}
> Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect FetchSessionCache eviction logic" ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly touched now, whereas previously the touch was being skipped.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)