You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/30 10:17:54 UTC

kafka git commit: MINOR: Shutdown ControllerEventThread via event instead of interruption

Repository: kafka
Updated Branches:
  refs/heads/trunk 38c5d7fba -> bc2c17a9b


MINOR: Shutdown ControllerEventThread via event instead of interruption

If the ControllerEventThread is interrupted when a request is
being sent, it may lead to an IllegalStateException being thrown.
This, in turn, can lead to a NullPointerException in
unregisterPartitionReassignmentIsrChangeHandlers,

To avoid these issues, we make the ControllerEventThread
uninterruptable and we shut it down by clearing the queue
and enqueuing a special event.

To make the code more robust, we also set
ReassignedPartitionsContext.reassignIsrChangeHandler
during construction instead of setting it to null first.

Finally, misleading log messages in ephemeral node
creation have been clarified.

For reference, the relevant log lines from the relevant
flaky test:

```text
[2017-11-15 10:30:13,869] ERROR Error while creating ephemeral at /controller with return code: OK (kafka.zk.KafkaZkClient$CheckedEphemeral:101)
[2017-11-15 10:30:14,155] ERROR Haven't been able to send leader and isr requests, current state of the map is Map(101 -> Map(topic1-0 -> PartitionState(controllerEpoch=2, leader=101, leaderEpoch=3, isr=101, zkVersion=3, replicas=100,102,101, isNew=false)), 100 -> Map(topic1-0 -> PartitionState(controllerEpoch=2, leader=101, leaderEpoch=3, isr=101, zkVersion=3, replicas=100,102,101, isNew=false)), 102 -> Map(topic1-0 -> PartitionState(controllerEpoch=2, leader=101, leaderEpoch=3, isr=101, zkVersion=3, replicas=100,102,101, isNew=false))). Exception message: java.lang.InterruptedException (kafka.controller.ControllerBrokerRequestBatch:101)
[2017-11-15 10:30:14,156] ERROR Haven't been able to send metadata update requests to brokers Set(102, 103, 104, 101, 105), current state of the partition info is Map(topic1-0 -> PartitionState(controllerEpoch=1, leader=101, leaderEpoch=2, isr=[101], zkVersion=2, replicas=[100, 102, 101], offlineReplicas=[100])). Exception message: java.lang.InterruptedException (kafka.controller.ControllerBrokerRequestBatch:101)
[2017-11-15 10:30:14,158] ERROR [Controller id=101] Forcing the controller to resign (kafka.controller.KafkaController:101)
[2017-11-15 10:30:14,158] ERROR [Controller id=101] Error completing reassignment of partition topic1-0 (kafka.controller.KafkaController:107)
java.lang.NullPointerException
	at kafka.controller.KafkaController$$anonfun$unregisterPartitionReassignmentIsrChangeHandlers$1.apply(KafkaController.scala:784)
	at kafka.controller.KafkaController$$anonfun$unregisterPartitionReassignmentIsrChangeHandlers$1.apply(KafkaController.scala:783)
```

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #4219 from ijuma/fix-npe-unregister-zk-listener


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc2c17a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc2c17a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc2c17a9

Branch: refs/heads/trunk
Commit: bc2c17a9b85c8cba546ec41129de8055d46f740d
Parents: 38c5d7f
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Nov 30 12:17:34 2017 +0200
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 30 12:17:34 2017 +0200

----------------------------------------------------------------------
 .../controller/ControllerEventManager.scala     | 42 +++++++++-------
 .../kafka/controller/ControllerState.scala      |  6 ++-
 .../kafka/controller/KafkaController.scala      | 52 ++++++++++++--------
 .../scala/kafka/tools/ProducerPerformance.scala |  1 -
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  4 +-
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 48 +++++++-----------
 6 files changed, 80 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc2c17a9/core/src/main/scala/kafka/controller/ControllerEventManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index 396a39d..8ccbfb5 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -41,7 +41,10 @@ class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer
 
   def start(): Unit = thread.start()
 
-  def close(): Unit = thread.shutdown()
+  def close(): Unit = {
+    clearAndPut(KafkaController.ShutdownEventThread)
+    thread.awaitShutdown()
+  }
 
   def put(event: ControllerEvent): Unit = inLock(putLock) {
     queue.put(event)
@@ -52,25 +55,28 @@ class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer
     queue.put(event)
   }
 
-  class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
+  class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
     override def doWork(): Unit = {
-      val controllerEvent = queue.take()
-      _state = controllerEvent.state
-
-      try {
-        rateAndTimeMetrics(state).time {
-          controllerEvent.process()
-        }
-      } catch {
-        case e: Throwable => error(s"Error processing event $controllerEvent", e)
-      }
-
-      try eventProcessedListener(controllerEvent)
-      catch {
-        case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
+      queue.take() match {
+        case KafkaController.ShutdownEventThread => initiateShutdown()
+        case controllerEvent =>
+          _state = controllerEvent.state
+
+          try {
+            rateAndTimeMetrics(state).time {
+              controllerEvent.process()
+            }
+          } catch {
+            case e: Throwable => error(s"Error processing event $controllerEvent", e)
+          }
+
+          try eventProcessedListener(controllerEvent)
+          catch {
+            case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
+          }
+
+          _state = ControllerState.Idle
       }
-
-      _state = ControllerState.Idle
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc2c17a9/core/src/main/scala/kafka/controller/ControllerState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala
index 2bb63e8..17af777 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -86,7 +86,11 @@ object ControllerState {
     def value = 11
   }
 
+  case object ControllerShutdown extends ControllerState {
+    def value = 12
+  }
+
   val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
     PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
-    LogDirChange)
+    LogDirChange, ControllerShutdown)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc2c17a9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 3615b7d..42c66f6 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -43,6 +43,15 @@ import scala.util.Try
 object KafkaController extends Logging {
   val InitialControllerEpoch = 1
   val InitialControllerEpochZkVersion = 1
+
+  /**
+   * ControllerEventThread will shutdown once it sees this event
+   */
+  private[controller] case object ShutdownEventThread extends ControllerEvent {
+    def state = ControllerState.ControllerShutdown
+    override def process(): Unit = ()
+  }
+
 }
 
 class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
@@ -489,16 +498,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  private def watchIsrChangesForReassignedPartition(partition: TopicPartition,
-                                                    reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition)
-    reassignedPartitionContext.reassignIsrChangeHandler = reassignIsrChangeHandler
-    // register listener on the leader and isr path to wait until they catch up with the current leader
-    zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
-  }
-
   private def initiateReassignReplicasForTopicPartition(topicPartition: TopicPartition,
-                                                reassignedPartitionContext: ReassignedPartitionsContext) {
+                                                        reassignedPartitionContext: ReassignedPartitionsContext) {
     val newReplicas = reassignedPartitionContext.newReplicas
     val topic = topicPartition.topic
     try {
@@ -511,7 +512,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
           } else {
             info(s"Handling reassignment of partition $topicPartition to new replicas ${newReplicas.mkString(",")}")
             // first register ISR change listener
-            watchIsrChangesForReassignedPartition(topicPartition, reassignedPartitionContext)
+            reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
             controllerContext.partitionsBeingReassigned.put(topicPartition, reassignedPartitionContext)
             // mark topic ineligible for deletion for the partitions being reassigned
             topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
@@ -632,7 +633,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }.keys
     reassignedPartitions.foreach(removePartitionFromReassignedPartitions)
     val partitionsToReassign = partitionsBeingReassigned -- reassignedPartitions
-    controllerContext.partitionsBeingReassigned ++= partitionsToReassign.mapValues(new ReassignedPartitionsContext(_))
+    controllerContext.partitionsBeingReassigned ++= partitionsToReassign.map { case (tp, newReplicas) =>
+      val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, tp)
+      tp -> new ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler)
+    }
     info(s"Partitions being reassigned: $partitionsBeingReassigned")
     info(s"Partitions already reassigned: $reassignedPartitions")
     info(s"Resuming reassignment of partitions: $partitionsToReassign")
@@ -773,15 +777,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private[controller] def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
     topics.foreach { topic =>
-      partitionModificationsHandlers.remove(topic)
-        .foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
+      partitionModificationsHandlers.remove(topic).foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
     }
   }
 
   private def unregisterPartitionReassignmentIsrChangeHandlers() {
-    controllerContext.partitionsBeingReassigned.values.foreach { reassignedPartitionsContext =>
-      zkClient.unregisterZNodeChangeHandler(reassignedPartitionsContext.reassignIsrChangeHandler.path)
-    }
+    controllerContext.partitionsBeingReassigned.values.foreach(_.unregisterReassignIsrChangeHandler(zkClient))
   }
 
   private def readControllerEpochFromZooKeeper() {
@@ -796,8 +797,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private def removePartitionFromReassignedPartitions(topicPartition: TopicPartition) {
     controllerContext.partitionsBeingReassigned.get(topicPartition).foreach { reassignContext =>
-      // stop watching the ISR changes for this partition
-      zkClient.unregisterZNodeChangeHandler(reassignContext.reassignIsrChangeHandler.path)
+      reassignContext.unregisterReassignIsrChangeHandler(zkClient)
     }
 
     val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned - topicPartition
@@ -1282,12 +1282,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
         val partitionReassignment = zkClient.getPartitionReassignment
         val partitionsToBeReassigned = partitionReassignment -- controllerContext.partitionsBeingReassigned.keys
-        partitionsToBeReassigned.foreach { case (tp, context) =>
+        partitionsToBeReassigned.foreach { case (tp, newReplicas) =>
           if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
             error(s"Skipping reassignment of $tp since the topic is currently being deleted")
             removePartitionFromReassignedPartitions(tp)
           } else {
-            initiateReassignReplicasForTopicPartition(tp, ReassignedPartitionsContext(context))
+            val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(KafkaController.this, eventManager,
+              tp)
+            initiateReassignReplicasForTopicPartition(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))
           }
         }
       }
@@ -1488,7 +1490,15 @@ class ControllerChangeHandler(controller: KafkaController, eventManager: Control
 }
 
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
-                                       var reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler = null)
+                                       val reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler) {
+
+  def registerReassignIsrChangeHandler(zkClient: KafkaZkClient): Unit =
+    zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
+
+  def unregisterReassignIsrChangeHandler(zkClient: KafkaZkClient): Unit =
+    zkClient.unregisterZNodeChangeHandler(reassignIsrChangeHandler.path)
+
+}
 
 case class PartitionAndReplica(topicPartition: TopicPartition, replica: Int) {
   def topic: String = topicPartition.topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc2c17a9/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 365c9af..f96200d 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -31,7 +31,6 @@ import java.math.BigInteger
 import java.nio.charset.StandardCharsets
 
 import org.apache.kafka.common.utils.Utils
-import org.slf4j.LoggerFactory
 
 /**
  * Load test for the producer

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc2c17a9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 60c2adf..004a408 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -821,7 +821,9 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
     jsonPartitionMapOpt match {
       case Some(jsonPartitionMap) =>
         val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
-        reassignedPartitions.map(p => p._1 -> new ReassignedPartitionsContext(p._2))
+        reassignedPartitions.map { case (tp, newReplicas) =>
+          tp -> new ReassignedPartitionsContext(newReplicas, null)
+        }
       case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc2c17a9/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 110d676..d4c7daa 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
+import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.security.auth.{Acl, Resource, ResourceType}
@@ -681,14 +681,6 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
-   * Gets the partitions being reassigned for given topics
-   * @return ReassignedPartitionsContexts for each topic which are being reassigned.
-   */
-  def getPartitionsBeingReassigned(): Map[TopicPartition, ReassignedPartitionsContext] = {
-    getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
-  }
-
-  /**
    * Gets topic partition states for the given partitions.
    * @param partitions the partitions for which we want to get states.
    * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
@@ -1285,43 +1277,37 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     info(s"Creating $path (is it secure? $isSecure)")
     val code = checkedEphemeral.create()
     info(s"Result of znode creation at $path is: $code")
-    code match {
-      case Code.OK =>
-      case _ => throw KeeperException.create(code)
-    }
+    if (code != Code.OK)
+      throw KeeperException.create(code)
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
     def create(): Code = {
       val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
       val createResponse = retryRequestUntilConnected(createRequest)
-      val code = createResponse.resultCode
-      code match {
-        case Code.OK => code
-        case Code.NODEEXISTS => get()
-        case _ =>
+      createResponse.resultCode match {
+        case code@ Code.OK => code
+        case Code.NODEEXISTS => getAfterNodeExists()
+        case code =>
           error(s"Error while creating ephemeral at $path with return code: $code")
           code
       }
     }
 
-    private def get(): Code = {
+    private def getAfterNodeExists(): Code = {
       val getDataRequest = GetDataRequest(path)
       val getDataResponse = retryRequestUntilConnected(getDataRequest)
-      val code = getDataResponse.resultCode
-      code match {
-        case Code.OK =>
-          if (getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId) {
-            error(s"Error while creating ephemeral at $path with return code: $code")
-            Code.NODEEXISTS
-          } else {
-            code
-          }
+      getDataResponse.resultCode match {
+        case Code.OK if getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId =>
+          error(s"Error while creating ephemeral at $path, node already exists and owner " +
+            s"'${getDataResponse.stat.getEphemeralOwner}' does not match current session '${zooKeeperClient.sessionId}'")
+          Code.NODEEXISTS
+        case code@ Code.OK => code
         case Code.NONODE =>
-          info(s"The ephemeral node at $path went away while reading it")
+          info(s"The ephemeral node at $path went away while reading it, attempting create() again")
           create()
-        case _ =>
-          error(s"Error while creating ephemeral at $path with return code: $code")
+        case code =>
+          error(s"Error while creating ephemeral at $path as it already exists and error getting the node data due to $code")
           code
       }
     }