You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC

[15/28] git commit: KAFKA-708 ISR becomes empty while marking a partition offline; reviewed by Jun Rao

KAFKA-708 ISR becomes empty while marking a partition offline; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb0d5a7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb0d5a7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb0d5a7f

Branch: refs/heads/trunk
Commit: eb0d5a7f815ac51e3bdcf7bb2c1d3af9f33f2d83
Parents: 22cd8bf
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Jan 23 13:47:27 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Jan 23 13:47:27 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |    2 +-
 .../scala/kafka/controller/KafkaController.scala   |   26 +++---
 .../kafka/controller/PartitionLeaderSelector.scala |   47 +++++-----
 .../kafka/controller/PartitionStateMachine.scala   |   26 +++---
 .../kafka/controller/ReplicaStateMachine.scala     |   71 ++++++++-------
 core/src/main/scala/kafka/utils/Logging.scala      |    2 +-
 core/src/main/scala/kafka/utils/Utils.scala        |    2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   11 ++-
 .../test/scala/unit/kafka/utils/UtilsTest.scala    |   12 +++
 9 files changed, 105 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 99af002..6955433 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -48,7 +48,7 @@ object PartitionStateInfo {
     val leader = buffer.getInt
     val leaderEpoch = buffer.getInt
     val isrString = readShortString(buffer)
-    val isr = isrString.split(",").map(_.toInt).toList
+    val isr = Utils.parseCsvList(isrString).map(_.toInt).toList
     val zkVersion = buffer.getInt
     val replicationFactor = buffer.getInt
     PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), controllerEpoch),

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 565c53a..f1a12c0 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -671,7 +671,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
               "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
               "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
           if (leaderAndIsr.isr.contains(replicaId)) {
-            val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
+            // if the replica to be removed from the ISR is also the leader, set the new leader value to -1
+            val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader
+            val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1,
               leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
             // update the new leadership decision in zookeeper or retry
             val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
@@ -683,8 +685,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
             if (updateSucceeded)
-              info("New leader and ISR for partition [%s, %d] is %s"
-                   .format(topic, partition, newLeaderAndIsr.toString()))
+              info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString()))
             updateSucceeded
           } else {
             warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
@@ -721,7 +722,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         partitionStateMachine.shutdown()
         replicaStateMachine.shutdown()
         if(controllerContext.controllerChannelManager != null) {
-          info("session expires, clean up the state")
           controllerContext.controllerChannelManager.shutdown()
           controllerContext.controllerChannelManager = null
         }
@@ -766,13 +766,11 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
           assignedReplicasOpt match {
             case Some(assignedReplicas) =>
               if(assignedReplicas == newReplicas) {
-                throw new KafkaException("Partition %s to be reassigned is already assigned to replicas"
-                  .format(topicAndPartition) +
+                throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
                   " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
               } else {
                 if(aliveNewReplicas == newReplicas) {
-                  info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
-                    newReplicas.mkString(",")))
+                  info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
                   val context = createReassignmentContextForPartition(topic, partition, newReplicas)
                   controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
                   controller.onPartitionReassignment(topicAndPartition, context)
@@ -851,18 +849,18 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
                 val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
                 if(caughtUpReplicas == reassignedReplicas) {
                   // resume the partition reassignment process
-                  info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
-                    .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
+                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
                     "Resuming partition reassignment")
                   controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
                 }
                 else {
-                  info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
-                    .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
+                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
                     "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
                 }
-              case None => error("Error handling reassignment of partition [%s, %d] to replicas %s as it was never created"
-                .format(topic, partition, reassignedReplicas.mkString(",")))
+              case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
+                .format(topicAndPartition, reassignedReplicas.mkString(",")))
             }
           case None =>
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3eb23cd..3ed9b7e 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -23,15 +23,14 @@ import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOff
 trait PartitionLeaderSelector {
 
   /**
-   * @param topic                      The topic of the partition whose leader needs to be elected
-   * @param partition                  The partition whose leader needs to be elected
+   * @param topicAndPartition          The topic and partition whose leader needs to be elected
    * @param currentLeaderAndIsr        The current leader and isr of input partition read from zookeeper
    * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
    * @return The leader and isr request, with the newly selected leader info, to send to the brokers
    * Also, returns the list of replicas the returned leader and isr request should be sent to
    * This API selects a new leader for the input partition
    */
-  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
+  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
 
 }
 
@@ -45,8 +44,8 @@ trait PartitionLeaderSelector {
 class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
   this.logIdent = "[OfflinePartitionLeaderSelector]: "
 
-  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    controllerContext.partitionReplicaAssignment.get(TopicAndPartition(topic, partition)) match {
+  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+    controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
       case Some(assignedReplicas) =>
         val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
         val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
@@ -60,7 +59,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
               case true =>
                 ControllerStats.offlinePartitionRate.mark()
                 throw new PartitionOfflineException(("No replica for partition " +
-                  "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
+                  "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                   " Assigned replicas are: [%s]".format(assignedReplicas))
               case false =>
                 ControllerStats.uncleanLeaderElectionRate.mark()
@@ -74,13 +73,11 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
             debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
             new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
         }
-        info("Selected new leader and ISR %s for offline partition [%s, %d]".format(newLeaderAndIsr.toString(), topic,
-          partition))
+        info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
         (newLeaderAndIsr, liveAssignedReplicasToThisPartition)
       case None =>
         ControllerStats.offlinePartitionRate.mark()
-        throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) +
-          "replicas assigned to it")
+        throw new PartitionOfflineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it")
     }
   }
 }
@@ -91,8 +88,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
 class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
   this.logIdent = "[ReassignedPartitionLeaderSelector]: "
 
-  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    val reassignedReplicas = controllerContext.partitionsBeingReassigned(TopicAndPartition(topic, partition)).newReplicas
+  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+    val reassignedReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
     val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
     val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
     // pick any replica from the newly assigned replicas list that is in the ISR
@@ -105,10 +102,10 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
         reassignedReplicas.size match {
           case 0 =>
             throw new StateChangeFailedException("List of reassigned replicas for partition " +
-              "([%s, %d]) is empty. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
+              " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
           case _ =>
             throw new StateChangeFailedException("None of the reassigned replicas for partition " +
-              "([%s, %d]) are alive. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
+              "%s are alive. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
         }
     }
   }
@@ -123,17 +120,16 @@ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerConte
 with Logging {
   this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
 
-  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    val topicAndPartition = TopicAndPartition(topic, partition)
+  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
     val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val preferredReplica = assignedReplicas.head
     // check if preferred replica is the current leader
     val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
     if(currentLeader == preferredReplica) {
-      throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]"
-        .format(preferredReplica, topic, partition))
+      throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s"
+        .format(preferredReplica, topicAndPartition))
     } else {
-      info("Current leader %d for partition [%s,%d] is not the preferred replica.".format(currentLeader, topic, partition) +
+      info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
         " Trigerring preferred replica leader election")
       // check if preferred replica is not the current leader and is alive and in the isr
       if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
@@ -141,7 +137,7 @@ with Logging {
           currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
       } else {
         throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
-          "[%s,%d] is either not alive or not in the isr. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
+          "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
       }
     }
   }
@@ -157,13 +153,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
 
   this.logIdent = "[ControlledShutdownLeaderSelector]: "
 
-  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
     val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
     val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
 
     val currentLeader = currentLeaderAndIsr.leader
 
-    val assignedReplicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
+    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
     val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
 
@@ -172,12 +168,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
     val newLeaderOpt = newIsr.headOption
     newLeaderOpt match {
       case Some(newLeader) =>
-        debug("Partition [%s,%d] : current leader = %d, new leader = %d"
-              .format(topic, partition, currentLeader, newLeader))
+        debug("Partition %s : current leader = %d, new leader = %d"
+              .format(topicAndPartition, currentLeader, newLeader))
         (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
          liveAssignedReplicas)
       case None =>
-        throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
+        throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides current leader %d and" +
+          " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 372793b..9bb318c 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -130,7 +130,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
           assignReplicasToPartitions(topic, partition)
           partitionState.put(topicAndPartition, NewPartition)
-          info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
+          info("Partition %s state changed from NotExists to New with assigned replicas ".format(topicAndPartition) +
             "%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")))
         case OnlinePartition =>
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
@@ -144,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
               electLeaderForPartition(topic, partition, leaderSelector)
             case _ => // should never come here since illegal previous states are checked above
           }
-          info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
+          info("Partition %s state changed from %s to OnlinePartition with leader %d".format(topicAndPartition,
             partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
           partitionState.put(topicAndPartition, OnlinePartition)
            // post: partition has a leader
@@ -152,18 +152,18 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           // pre: partition should be in Online state
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
           // should be called when the leader for a partition is no longer alive
-          info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
+          info("Partition %s state changed from Online to Offline".format(topicAndPartition))
           partitionState.put(topicAndPartition, OfflinePartition)
           // post: partition has no alive leader
         case NonExistentPartition =>
           // pre: partition could be in either of the above states
           assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
-          info("Partition [%s, %d] state changed from Offline to NotExists".format(topic, partition))
+          info("Partition %s state changed from Offline to NotExists".format(topicAndPartition))
           partitionState.put(topicAndPartition, NonExistentPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
     } catch {
-      case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) +
+      case t: Throwable => error("State change for partition %s ".format(topicAndPartition) +
         "from %s to %s failed".format(currState, targetState), t)
     }
   }
@@ -266,8 +266,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @param leaderSelector      Specific leader selector (e.g., offline/reassigned/etc.)
    */
   def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
+    val topicAndPartition = TopicAndPartition(topic, partition)
     // handle leader election for the partitions whose leader is no longer alive
-    info("Electing leader for partition [%s, %d]".format(topic, partition))
+    info("Electing leader for partition %s".format(topicAndPartition))
     try {
       var zookeeperPathUpdateSucceeded: Boolean = false
       var newLeaderAndIsr: LeaderAndIsr = null
@@ -281,7 +282,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
             "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
         // elect new leader or throw exception
-        val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr)
+        val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
         val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
           ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
           ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion)
@@ -293,15 +294,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
       // update the leader cache
       controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
-      info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
+      info("Elected leader %d for Offline partition %s".format(newLeaderAndIsr.leader, topicAndPartition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
     } catch {
-      case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
-        .format(topic, partition) + " Marking this partition offline", poe)
+      case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition %s are dead."
+        .format(topicAndPartition) + " Marking this partition offline", poe)
       case sce => throw new StateChangeFailedException(("Error while electing leader for partition " +
-        " [%s, %d] due to: %s.").format(topic, partition, sce.getMessage), sce)
+        " %s due to: %s.").format(topicAndPartition, sce.getMessage), sce)
     }
     debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
   }
@@ -315,11 +316,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
+    val topicAndPartition = TopicAndPartition(topic, partition)
     ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
       case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
       case None =>
         throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
-          "[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition))))
+          "%s in %s state".format(topicAndPartition, partitionState(topicAndPartition)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 264285c..20d9c4f 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -110,14 +110,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           leaderIsrAndControllerEpochOpt match {
             case Some(leaderIsrAndControllerEpoch) =>
               if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
-                throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica"
-                  .format(replicaId, topic, partition) + "state as it is being requested to become leader")
+                throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
+                  .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
                                                                   topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
-          info("Replica %d for partition [%s, %d] state changed to NewReplica".format(replicaId, topic, partition))
+          info("Replica %d for partition %s state changed to NewReplica".format(replicaId, topicAndPartition))
         case NonExistentReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
           // send stop replica command
@@ -126,7 +126,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
           controllerContext.partitionReplicaAssignment.put(topicAndPartition,
             currentAssignedReplicas.filterNot(_ == replicaId))
-          info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition))
+          info("Replica %d for partition %s state changed to NonExistentReplica".format(replicaId, topicAndPartition))
           replicaState.remove((topic, partition, replicaId))
         case OnlineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
@@ -135,19 +135,19 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               // add this replica to the assigned replicas list for its partition
               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
               controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
-              info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
+              info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
             case _ =>
               // check if the leader for this partition is alive or even exists
                 controllerContext.allLeaders.get(topicAndPartition) match {
                 case Some(leaderIsrAndControllerEpoch) =>
-                  val leader = leaderIsrAndControllerEpoch.leaderAndIsr.leader
-                  if (controllerContext.liveOrShuttingDownBrokerIds.contains(leader)) {
-                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                                                                        topic, partition, leaderIsrAndControllerEpoch,
-                                                                        replicaAssignment.size)
-                    replicaState.put((topic, partition, replicaId), OnlineReplica)
-                    info("Replica %d for partition [%s, %d] state changed to OnlineReplica"
-                         .format(replicaId, topic, partition))
+                  controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
+                    case true => // leader is alive
+                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
+                                                                          topic, partition, leaderIsrAndControllerEpoch,
+                                                                          replicaAssignment.size)
+                      replicaState.put((topic, partition, replicaId), OnlineReplica)
+                      info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
+                    case false => // ignore partitions whose leader is not alive
                   }
                 case None => // ignore partitions who don't have a leader yet
               }
@@ -156,30 +156,31 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
         case OfflineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
           // As an optimization, the controller removes dead replicas from the ISR
-          val leaderAndIsrIsEmpty: Boolean = controllerContext.allLeaders.get(topicAndPartition) match {
-            case Some(currLeaderIsrAndControllerEpoch) =>
-              if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
-                controller.removeReplicaFromIsr(topic, partition, replicaId) match {
-                  case Some(updatedLeaderIsrAndControllerEpoch) =>
-                    // send the shrunk ISR state change request only to the leader
-                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
-                                                                        topic, partition, updatedLeaderIsrAndControllerEpoch,
-                                                                        replicaAssignment.size)
-                    replicaState.put((topic, partition, replicaId), OfflineReplica)
-                    info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
-                    info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
-                    false
-                  case None =>
-                    true
-                }
-              else false
-            case None =>
-              true
-          }
+          val leaderAndIsrIsEmpty: Boolean =
+            controllerContext.allLeaders.get(topicAndPartition) match {
+              case Some(currLeaderIsrAndControllerEpoch) =>
+                if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
+                  controller.removeReplicaFromIsr(topic, partition, replicaId) match {
+                    case Some(updatedLeaderIsrAndControllerEpoch) =>
+                      // send the shrunk ISR state change request only to the leader
+                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
+                        topic, partition, updatedLeaderIsrAndControllerEpoch,
+                        replicaAssignment.size)
+                      replicaState.put((topic, partition, replicaId), OfflineReplica)
+                      info("Replica %d for partition %s state changed to OfflineReplica".format(replicaId, topicAndPartition))
+                      info("Removed offline replica %d from ISR for partition %s".format(replicaId, topicAndPartition))
+                      false
+                    case None =>
+                      true
+                  }
+                else false
+              case None =>
+                true
+            }
           if (leaderAndIsrIsEmpty)
             throw new StateChangeFailedException(
-              "Failed to change state of replica %d for partition [%s, %d] since the leader and isr path in zookeeper is empty"
-              .format(replicaId, topic, partition))
+              "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
+              .format(replicaId, topicAndPartition))
       }
     }
     catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala
index d36fe23..d9f010b 100644
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -28,7 +28,7 @@ trait Logging {
   // Force initialization to register Log4jControllerMBean
   private val log4jController = Log4jController
 
-  private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
+  private def msgWithLogIdent(msg: String) = logIdent + msg
 
   def trace(msg: => String): Unit = {
     if (logger.isTraceEnabled())

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index c83752f..0185c14 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -442,7 +442,7 @@ object Utils extends Logging {
    * Whitespace surrounding the comma will be removed.
    */
   def parseCsvList(csvList: String): Seq[String] = {
-    if(csvList == null)
+    if(csvList == null || csvList.isEmpty)
       Seq.empty[String]
     else {
       csvList.split("\\s*,\\s*").filter(v => !v.equals(""))

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f594404..113ad37 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -95,10 +95,11 @@ object ZkUtils extends Logging {
   : Option[LeaderIsrAndControllerEpoch] = {
     Json.parseFull(leaderAndIsrStr) match {
       case Some(m) =>
-        val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
-        val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
-        val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
-        val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
+        val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]]
+        val leader = leaderIsrAndEpochInfo.get("leader").get.toInt
+        val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt
+        val isrString = leaderIsrAndEpochInfo.get("ISR").get
+        val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt
         val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
         val zkPathVersion = stat.getVersion
         debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
@@ -201,7 +202,7 @@ object ZkUtils extends Logging {
     val jsonDataMap = new HashMap[String, String]
     jsonDataMap.put("leader", leaderAndIsr.leader.toString)
     jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
-    jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
+    jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(","))
     jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
     Utils.stringMapToJson(jsonDataMap)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index a4d3a27..cce6c8e 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -62,4 +62,16 @@ class UtilsTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testCsvList() {
+    val emptyString:String = ""
+    val nullString:String = null
+    val emptyList = Utils.parseCsvList(emptyString)
+    val emptyListFromNullString = Utils.parseCsvList(nullString)
+    val emptyStringList = Seq.empty[String]
+    assertTrue(emptyList!=null)
+    assertTrue(emptyListFromNullString!=null)
+    assertTrue(emptyStringList.equals(emptyListFromNullString))
+    assertTrue(emptyStringList.equals(emptyList))
+  }
 }