You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "shilin Lu (Jira)" <ji...@apache.org> on 2020/04/22 13:49:00 UTC

[jira] [Created] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug

shilin Lu created KAFKA-9903:
--------------------------------

             Summary: kafka ShutdownableThread  judge thread isRuning status has some bug
                 Key: KAFKA-9903
                 URL: https://issues.apache.org/jira/browse/KAFKA-9903
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 2.3.1
            Reporter: shilin Lu
         Attachments: image-2020-04-22-21-28-03-154.png

h2. 1.bug
{code:java}
override def run(): Unit = {
  isStarted = true
  info("Starting")
  try {
    while (isRunning)
      doWork()
  } catch {
    case e: FatalExitError =>
      shutdownInitiated.countDown()
      shutdownComplete.countDown()
      info("Stopped")
      Exit.exit(e.statusCode())
    case e: Throwable =>
      if (isRunning)
        error("Error due to", e)
  } finally {
    shutdownInitiated.countDown()
    shutdownComplete.countDown()
  }
  info("Stopped")
}

def isRunning: Boolean = {
  shutdownInitiated.getCount() != 0
}{code}
1.when replicaThread has exception which is not fatalExitError, the thread will exit,and run finally logic(countdown the shutdownComplete conutdownLatch),but shutdownInitiated is not be countdown.

2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning logic just judge thread isRuning through shutdownInitiated != 0, so through this method to judge thread status is wrong.

3.isRunning method is used in shutdownIdleFetcherThreads, processFetchRequest, controller request send and oher else, maybe cause thread can't be remove and something can not be done
h2. 2.bugfix

Just like the following code,countdown shutdownInitiated in finally logic

 
{code:java}
override def run(): Unit = {
  isStarted = true
  info("Starting")
  try {
    while (isRunning)
      doWork()
  } catch {
    case e: FatalExitError =>
      shutdownInitiated.countDown()
      shutdownComplete.countDown()
      info("Stopped")
      Exit.exit(e.statusCode())
    case e: Throwable =>
      if (isRunning)
        error("Error due to", e)
  } finally {
    shutdownInitiated.countDown()
    shutdownComplete.countDown()
  }
  info("Stopped")
}
{code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)