You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:05 UTC

[20/36] git commit: KAFKA-999 Controlled shutdown never succeeds until the broker is killed; reviewed by Neha Narkhede

KAFKA-999 Controlled shutdown never succeeds until the broker is killed; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9753d15e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9753d15e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9753d15e

Branch: refs/heads/trunk
Commit: 9753d15ef1da0ecab9bbee51d8cdc08c2159c50b
Parents: 1d6ad3d
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Tue Aug 6 20:41:59 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Aug 6 20:42:08 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/api/LeaderAndIsrRequest.scala   | 14 ++++++-------
 .../main/scala/kafka/cluster/Partition.scala    | 21 ++++++++++----------
 .../controller/ControllerChannelManager.scala   |  7 +++----
 .../kafka/controller/KafkaController.scala      |  4 ++--
 .../controller/PartitionStateMachine.scala      |  4 ++--
 .../kafka/controller/ReplicaStateMachine.scala  |  2 +-
 .../scala/kafka/server/ReplicaManager.scala     |  6 +++---
 7 files changed, 29 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index a474474..981d2bb 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -135,13 +135,13 @@ case class LeaderAndIsrRequest (versionId: Short,
                                 controllerId: Int,
                                 controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                aliveLeaders: Set[Broker])
+                                leaders: Set[Broker])
     extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], aliveLeaders: Set[Broker], controllerId: Int,
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int,
            controllerEpoch: Int, correlationId: Int, clientId: String) = {
     this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId,
-         controllerId, controllerEpoch, partitionStateInfos, aliveLeaders)
+         controllerId, controllerEpoch, partitionStateInfos, leaders)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -156,8 +156,8 @@ case class LeaderAndIsrRequest (versionId: Short,
       buffer.putInt(key._2)
       value.writeTo(buffer)
     }
-    buffer.putInt(aliveLeaders.size)
-    aliveLeaders.foreach(_.writeTo(buffer))
+    buffer.putInt(leaders.size)
+    leaders.foreach(_.writeTo(buffer))
   }
 
   def sizeInBytes(): Int = {
@@ -171,7 +171,7 @@ case class LeaderAndIsrRequest (versionId: Short,
     for((key, value) <- partitionStateInfos)
       size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
     size += 4 /* number of leader brokers */
-    for(broker <- aliveLeaders)
+    for(broker <- leaders)
       size += broker.sizeInBytes /* broker info */
     size
   }
@@ -185,7 +185,7 @@ case class LeaderAndIsrRequest (versionId: Short,
     leaderAndIsrRequest.append(";CorrelationId:" + correlationId)
     leaderAndIsrRequest.append(";ClientId:" + clientId)
     leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
-    leaderAndIsrRequest.append(";Leaders:" + aliveLeaders.mkString(","))
+    leaderAndIsrRequest.append(";Leaders:" + leaders.mkString(","))
     leaderAndIsrRequest.toString()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 88fc8dd..a9bb3c8 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -167,7 +167,7 @@ class Partition(val topic: String,
    *  4. start a fetcher to the new leader
    */
   def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                   aliveLeaders: Set[Broker], correlationId: Int): Boolean = {
+                   leaders: Set[Broker], correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
@@ -186,7 +186,8 @@ class Partition(val topic: String,
       // on the leader
       val localReplica = getOrCreateReplica()
       val newLeaderBrokerId: Int = leaderAndIsr.leader
-      aliveLeaders.find(_.id == newLeaderBrokerId) match {
+      // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
+      leaders.find(_.id == newLeaderBrokerId) match {
         case Some(leaderBroker) =>
           // stop fetcher thread to previous leader
           replicaFetcherManager.removeFetcher(topic, partitionId)
@@ -200,15 +201,15 @@ class Partition(val topic: String,
             replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
           }
           else {
-            stateChangeLogger.trace("Broker %d ignored the become-follower state change with correlation id %d from " +
-              " controller %d epoch %d since it is shutting down"
-                .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch))
+            stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " +
+                                     "controller %d epoch %d since it is shutting down")
+                                      .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch))
           }
-        case None => // leader went down
-          stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " +
-            " controller %d epoch %d since leader %d for partition [%s,%d] is unavailable during the state change operation"
-                                     .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
-                                              newLeaderBrokerId, topic, partitionId))
+        case None => // we should not come here
+          stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
+                                   "controller %d epoch %d for partition [%s,%d] new leader %d")
+                                    .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+                                            topic, partitionId, newLeaderBrokerId))
       }
       true
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 38b8674..ed1ce0b 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -229,14 +229,13 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
     }
   }
 
-  def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int, liveBrokers: Set[Broker]) {
+  def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
-      val aliveLeaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, aliveLeaders, controllerId, controllerEpoch, correlationId,
-                                                        clientId)
+      val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId)
       for (p <- partitionStateInfos) {
         val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
         stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b07e27b..c87caab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -168,7 +168,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
               // before which the stop replica request should be completed (in most cases)
               brokerRequestBatch.newBatch()
               brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false)
-              brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+              brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
 
               // If the broker is a follower, updates the isr in ZK and notifies the current leader
               replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
@@ -656,7 +656,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
     brokerRequestBatch.newBatch()
     brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
-    brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+    brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 0135d45..a084830 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -89,7 +89,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
           handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     } catch {
       case e => error("Error while moving some partitions to the online state", e)
       // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
@@ -109,7 +109,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       partitions.foreach { topicAndPartition =>
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     }catch {
       case e => error("Error while moving some partitions to %s state".format(targetState), e)
       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions

http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 0c9d436..9f752f4 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -87,7 +87,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     try {
       brokerRequestBatch.newBatch()
       replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState), e)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d885ba1..f551243 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -220,7 +220,7 @@ class ReplicaManager(val config: KafkaConfig,
           if(requestedLeaderId == config.brokerId)
             makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId)
           else
-            makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.aliveLeaders,
+            makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
                          leaderAndISRRequest.correlationId)
         } catch {
           case e =>
@@ -264,14 +264,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int,
-                           partitionStateInfo: PartitionStateInfo, aliveLeaders: Set[Broker], correlationId: Int) {
+                           partitionStateInfo: PartitionStateInfo, leaders: Set[Broker], correlationId: Int) {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
                              "starting the become-follower transition for partition [%s,%d]")
                                .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, aliveLeaders, correlationId)) {
+    if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, leaders, correlationId)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition