You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/07/09 01:14:47 UTC

[22/43] git commit: KAFKA-849 Bug in controller's startup/failover logic fails to update in memory leader and isr cache causing other state changes to work incorrectly; reviewed by Jun Rao and Swapnil Ghike

KAFKA-849 Bug in controller's startup/failover logic fails to update in memory leader and isr cache causing other state changes to work incorrectly; reviewed by Jun Rao and Swapnil Ghike


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/756be536
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/756be536
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/756be536

Branch: refs/heads/trunk
Commit: 756be5363b83782b55922dfedd8ef9d3b405199c
Parents: 1d3c343
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Apr 4 21:27:20 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Apr 4 21:27:20 2013 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  2 +-
 .../scala/kafka/admin/ListTopicCommand.scala    |  3 +-
 .../scala/kafka/api/LeaderAndIsrRequest.scala   | 37 ++++++++++++--------
 .../main/scala/kafka/api/TopicMetadata.scala    |  1 +
 .../main/scala/kafka/cluster/Partition.scala    |  4 +--
 .../controller/ControllerChannelManager.scala   |  4 +--
 .../kafka/controller/KafkaController.scala      | 25 ++++++-------
 core/src/main/scala/kafka/log/LogSegment.scala  |  2 +-
 .../kafka/server/AbstractFetcherManager.scala   |  2 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  4 +++
 .../scala/kafka/server/ReplicaManager.scala     |  7 ++--
 11 files changed, 51 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index f4bf3b9..63f5bc8 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -136,7 +136,7 @@ object AdminUtils extends Logging {
           new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
         } catch {
           case e =>
-            error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
+            debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
             new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
               ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/admin/ListTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
index baeb099..095469b 100644
--- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
@@ -82,9 +82,8 @@ object ListTopicCommand {
       case ErrorMapping.UnknownTopicOrPartitionCode =>
         println("topic " + topic + " doesn't exist!")
       case _ =>
-        println("topic: " + topic)
         for (part <- topicMetaData.partitionsMetadata)
-          println(part.toString)
+          println("topic: " + topic + "\t" + part.toString)
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 3b7ee24..68e64d6 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -79,6 +79,13 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
       4 /* replication factor */
     size
   }
+  
+  override def toString(): String = {
+    val partitionStateInfo = new StringBuilder
+    partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
+    partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
+    partitionStateInfo.toString()
+  }
 }
 
 object LeaderAndIsrRequest {
@@ -121,13 +128,13 @@ case class LeaderAndIsrRequest (versionId: Short,
                                 controllerId: Int,
                                 controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[Broker])
+                                aliveLeaders: Set[Broker])
     extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int,
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], aliveLeaders: Set[Broker], controllerId: Int,
            controllerEpoch: Int, correlationId: Int, clientId: String) = {
     this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, LeaderAndIsrRequest.DefaultAckTimeout,
-         controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
+         controllerId, controllerEpoch, partitionStateInfos, aliveLeaders)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -143,8 +150,8 @@ case class LeaderAndIsrRequest (versionId: Short,
       buffer.putInt(key._2)
       value.writeTo(buffer)
     }
-    buffer.putInt(leaders.size)
-    leaders.foreach(_.writeTo(buffer))
+    buffer.putInt(aliveLeaders.size)
+    aliveLeaders.foreach(_.writeTo(buffer))
   }
 
   def sizeInBytes(): Int = {
@@ -159,22 +166,22 @@ case class LeaderAndIsrRequest (versionId: Short,
     for((key, value) <- partitionStateInfos)
       size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
     size += 4 /* number of leader brokers */
-    for(broker <- leaders)
+    for(broker <- aliveLeaders)
       size += broker.sizeInBytes /* broker info */
     size
   }
 
   override def toString(): String = {
     val leaderAndIsrRequest = new StringBuilder
-    leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName)
-    leaderAndIsrRequest.append("; Version: " + versionId)
-    leaderAndIsrRequest.append("; Controller: " + controllerId)
-    leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
-    leaderAndIsrRequest.append("; CorrelationId: " + correlationId)
-    leaderAndIsrRequest.append("; ClientId: " + clientId)
-    leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
-    leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(","))
-    leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(","))
+    leaderAndIsrRequest.append("Name:" + this.getClass.getSimpleName)
+    leaderAndIsrRequest.append(";Version:" + versionId)
+    leaderAndIsrRequest.append(";Controller:" + controllerId)
+    leaderAndIsrRequest.append(";ControllerEpoch:" + controllerEpoch)
+    leaderAndIsrRequest.append(";CorrelationId:" + correlationId)
+    leaderAndIsrRequest.append(";ClientId:" + clientId)
+    leaderAndIsrRequest.append(";AckTimeoutMs:" + ackTimeoutMs + " ms")
+    leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
+    leaderAndIsrRequest.append(";Leaders:" + aliveLeaders.mkString(","))
     leaderAndIsrRequest.toString()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index f822678..a0d68c5 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -115,6 +115,7 @@ case class PartitionMetadata(partitionId: Int,
     partitionMetadataString.append("\tleader: " + (if(leader.isDefined) formatBroker(leader.get) else "none"))
     partitionMetadataString.append("\treplicas: " + replicas.map(formatBroker).mkString(","))
     partitionMetadataString.append("\tisr: " + isr.map(formatBroker).mkString(","))
+    partitionMetadataString.append("\tisUnderReplicated: %s".format(if(isr.size < replicas.size) "true" else "false"))
     partitionMetadataString.toString()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index aa2092e..d5f5a4e 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -166,7 +166,7 @@ class Partition(val topic: String,
    *  4. start a fetcher to the new leader
    */
   def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                   liveBrokers: Set[Broker], correlationId: Int): Boolean = {
+                   aliveLeaders: Set[Broker], correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
@@ -185,7 +185,7 @@ class Partition(val topic: String,
       // on the leader
       val localReplica = getOrCreateReplica()
       val newLeaderBrokerId: Int = leaderAndIsr.leader
-      liveBrokers.find(_.id == newLeaderBrokerId) match {
+      aliveLeaders.find(_.id == newLeaderBrokerId) match {
         case Some(leaderBroker) =>
           // stop fetcher thread to previous leader
           replicaFetcherManager.removeFetcher(topic, partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index f7a7bd4..3164f78 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -190,8 +190,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
       val broker = m._1
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
-      val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId,
+      val aliveLeaders = liveBrokers.filter(b => leaderIds.contains(b.id))
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, aliveLeaders, controllerId, controllerEpoch, correlationId,
                                                         clientId)
       for (p <- partitionStateInfos) {
         val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 74614d8..65def03 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -238,8 +238,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       partitionStateMachine.registerListeners()
       replicaStateMachine.registerListeners()
       initializeControllerContext()
-      partitionStateMachine.startup()
       replicaStateMachine.startup()
+      partitionStateMachine.startup()
       Utils.registerMBean(this, KafkaController.MBeanName)
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
       initializeAndMaybeTriggerPartitionReassignment()
@@ -523,16 +523,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   private def updateLeaderAndIsrCache() {
     val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq)
-    for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) {
-      // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
-      controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
-        case true =>
-          controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
-        case false =>
-          debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderIsrAndControllerEpoch.leaderAndIsr.leader) +
-            "partition %s is dead, just ignore it".format(topicPartition))
-      }
-    }
+    for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
+      controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
   }
 
   private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
@@ -982,7 +974,16 @@ case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
 
 case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
 
-case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
+case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
+  override def toString(): String = {
+    val leaderAndIsrInfo = new StringBuilder
+    leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
+    leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
+    leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch)
+    leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
+    leaderAndIsrInfo.toString()
+  }
+}
 
 object ControllerStats extends KafkaMetricsGroup {
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 2e40629..213db6e 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -135,7 +135,7 @@ class LogSegment(val messageSet: FileMessageSet,
   
   /**
    * Calculate the offset that would be used for the next message to be append to this segment.
-   * Not that this is expensive.
+   * Note that this is expensive.
    */
   def nextOffset(): Long = {
     val ms = read(index.lastOffset, messageSet.sizeInBytes, None)

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index e8702e2..be872dc 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -46,7 +46,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
           fetcherThread.start
       }
       fetcherThread.addPartition(topic, partitionId, initialOffset)
-      info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d"
+      info("adding fetcher on topic %s, partition %d, initOffset %d to broker %d with fetcherId %d"
           .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId))
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index d4f15c1..b733fa3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -83,6 +83,8 @@ class ReplicaFetcherThread(name:String,
     val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
     if (leaderEndOffset < log.logEndOffset) {
       log.truncateTo(leaderEndOffset)
+      warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d"
+        .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset))
       leaderEndOffset
     } else {
       /**
@@ -93,6 +95,8 @@ class ReplicaFetcherThread(name:String,
        */
       val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
       log.truncateAndStartWithNewOffset(leaderStartOffset)
+      warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d"
+        .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset))
       leaderStartOffset
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 68e712c..6d849ac 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -219,7 +219,7 @@ class ReplicaManager(val config: KafkaConfig,
           if(requestedLeaderId == config.brokerId)
             makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId)
           else
-            makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
+            makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.aliveLeaders,
                          leaderAndISRRequest.correlationId)
         } catch {
           case e =>
@@ -264,15 +264,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int,
-                           partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker], correlationId: Int) {
+                           partitionStateInfo: PartitionStateInfo, aliveLeaders: Set[Broker], correlationId: Int) {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
-    val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
                              "starting the become-follower transition for partition [%s,%d]")
                                .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers, correlationId)) {
+    if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, aliveLeaders, correlationId)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition