You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/07/09 01:14:47 UTC
[22/43] git commit: KAFKA-849 Bug in controller's startup/failover
logic fails to update in memory leader and isr cache causing other state
changes to work incorrectly; reviewed by Jun Rao and Swapnil Ghike
KAFKA-849 Bug in controller's startup/failover logic fails to update in memory leader and isr cache causing other state changes to work incorrectly; reviewed by Jun Rao and Swapnil Ghike
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/756be536
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/756be536
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/756be536
Branch: refs/heads/trunk
Commit: 756be5363b83782b55922dfedd8ef9d3b405199c
Parents: 1d3c343
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Apr 4 21:27:20 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Apr 4 21:27:20 2013 -0700
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 2 +-
.../scala/kafka/admin/ListTopicCommand.scala | 3 +-
.../scala/kafka/api/LeaderAndIsrRequest.scala | 37 ++++++++++++--------
.../main/scala/kafka/api/TopicMetadata.scala | 1 +
.../main/scala/kafka/cluster/Partition.scala | 4 +--
.../controller/ControllerChannelManager.scala | 4 +--
.../kafka/controller/KafkaController.scala | 25 ++++++-------
core/src/main/scala/kafka/log/LogSegment.scala | 2 +-
.../kafka/server/AbstractFetcherManager.scala | 2 +-
.../kafka/server/ReplicaFetcherThread.scala | 4 +++
.../scala/kafka/server/ReplicaManager.scala | 7 ++--
11 files changed, 51 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index f4bf3b9..63f5bc8 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -136,7 +136,7 @@ object AdminUtils extends Logging {
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
case e =>
- error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
+ debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/admin/ListTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
index baeb099..095469b 100644
--- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
@@ -82,9 +82,8 @@ object ListTopicCommand {
case ErrorMapping.UnknownTopicOrPartitionCode =>
println("topic " + topic + " doesn't exist!")
case _ =>
- println("topic: " + topic)
for (part <- topicMetaData.partitionsMetadata)
- println(part.toString)
+ println("topic: " + topic + "\t" + part.toString)
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 3b7ee24..68e64d6 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -79,6 +79,13 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
4 /* replication factor */
size
}
+
+ override def toString(): String = {
+ val partitionStateInfo = new StringBuilder
+ partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
+ partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
+ partitionStateInfo.toString()
+ }
}
object LeaderAndIsrRequest {
@@ -121,13 +128,13 @@ case class LeaderAndIsrRequest (versionId: Short,
controllerId: Int,
controllerEpoch: Int,
partitionStateInfos: Map[(String, Int), PartitionStateInfo],
- leaders: Set[Broker])
+ aliveLeaders: Set[Broker])
extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
- def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int,
+ def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], aliveLeaders: Set[Broker], controllerId: Int,
controllerEpoch: Int, correlationId: Int, clientId: String) = {
this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, LeaderAndIsrRequest.DefaultAckTimeout,
- controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
+ controllerId, controllerEpoch, partitionStateInfos, aliveLeaders)
}
def writeTo(buffer: ByteBuffer) {
@@ -143,8 +150,8 @@ case class LeaderAndIsrRequest (versionId: Short,
buffer.putInt(key._2)
value.writeTo(buffer)
}
- buffer.putInt(leaders.size)
- leaders.foreach(_.writeTo(buffer))
+ buffer.putInt(aliveLeaders.size)
+ aliveLeaders.foreach(_.writeTo(buffer))
}
def sizeInBytes(): Int = {
@@ -159,22 +166,22 @@ case class LeaderAndIsrRequest (versionId: Short,
for((key, value) <- partitionStateInfos)
size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
size += 4 /* number of leader brokers */
- for(broker <- leaders)
+ for(broker <- aliveLeaders)
size += broker.sizeInBytes /* broker info */
size
}
override def toString(): String = {
val leaderAndIsrRequest = new StringBuilder
- leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName)
- leaderAndIsrRequest.append("; Version: " + versionId)
- leaderAndIsrRequest.append("; Controller: " + controllerId)
- leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
- leaderAndIsrRequest.append("; CorrelationId: " + correlationId)
- leaderAndIsrRequest.append("; ClientId: " + clientId)
- leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
- leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(","))
- leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(","))
+ leaderAndIsrRequest.append("Name:" + this.getClass.getSimpleName)
+ leaderAndIsrRequest.append(";Version:" + versionId)
+ leaderAndIsrRequest.append(";Controller:" + controllerId)
+ leaderAndIsrRequest.append(";ControllerEpoch:" + controllerEpoch)
+ leaderAndIsrRequest.append(";CorrelationId:" + correlationId)
+ leaderAndIsrRequest.append(";ClientId:" + clientId)
+ leaderAndIsrRequest.append(";AckTimeoutMs:" + ackTimeoutMs + " ms")
+ leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
+ leaderAndIsrRequest.append(";Leaders:" + aliveLeaders.mkString(","))
leaderAndIsrRequest.toString()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index f822678..a0d68c5 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -115,6 +115,7 @@ case class PartitionMetadata(partitionId: Int,
partitionMetadataString.append("\tleader: " + (if(leader.isDefined) formatBroker(leader.get) else "none"))
partitionMetadataString.append("\treplicas: " + replicas.map(formatBroker).mkString(","))
partitionMetadataString.append("\tisr: " + isr.map(formatBroker).mkString(","))
+ partitionMetadataString.append("\tisUnderReplicated: %s".format(if(isr.size < replicas.size) "true" else "false"))
partitionMetadataString.toString()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index aa2092e..d5f5a4e 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -166,7 +166,7 @@ class Partition(val topic: String,
* 4. start a fetcher to the new leader
*/
def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
- liveBrokers: Set[Broker], correlationId: Int): Boolean = {
+ aliveLeaders: Set[Broker], correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
@@ -185,7 +185,7 @@ class Partition(val topic: String,
// on the leader
val localReplica = getOrCreateReplica()
val newLeaderBrokerId: Int = leaderAndIsr.leader
- liveBrokers.find(_.id == newLeaderBrokerId) match {
+ aliveLeaders.find(_.id == newLeaderBrokerId) match {
case Some(leaderBroker) =>
// stop fetcher thread to previous leader
replicaFetcherManager.removeFetcher(topic, partitionId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index f7a7bd4..3164f78 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -190,8 +190,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
val broker = m._1
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
- val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
- val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId,
+ val aliveLeaders = liveBrokers.filter(b => leaderIds.contains(b.id))
+ val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, aliveLeaders, controllerId, controllerEpoch, correlationId,
clientId)
for (p <- partitionStateInfos) {
val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 74614d8..65def03 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -238,8 +238,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
- partitionStateMachine.startup()
replicaStateMachine.startup()
+ partitionStateMachine.startup()
Utils.registerMBean(this, KafkaController.MBeanName)
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
initializeAndMaybeTriggerPartitionReassignment()
@@ -523,16 +523,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private def updateLeaderAndIsrCache() {
val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq)
- for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) {
- // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
- controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
- case true =>
- controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
- case false =>
- debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderIsrAndControllerEpoch.leaderAndIsr.leader) +
- "partition %s is dead, just ignore it".format(topicPartition))
- }
- }
+ for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
+ controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
}
private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
@@ -982,7 +974,16 @@ case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
-case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
+case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
+ override def toString(): String = {
+ val leaderAndIsrInfo = new StringBuilder
+ leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
+ leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
+ leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch)
+ leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
+ leaderAndIsrInfo.toString()
+ }
+}
object ControllerStats extends KafkaMetricsGroup {
val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 2e40629..213db6e 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -135,7 +135,7 @@ class LogSegment(val messageSet: FileMessageSet,
/**
* Calculate the offset that would be used for the next message to be append to this segment.
- * Not that this is expensive.
+ * Note that this is expensive.
*/
def nextOffset(): Long = {
val ms = read(index.lastOffset, messageSet.sizeInBytes, None)
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index e8702e2..be872dc 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -46,7 +46,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
fetcherThread.start
}
fetcherThread.addPartition(topic, partitionId, initialOffset)
- info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d"
+ info("adding fetcher on topic %s, partition %d, initOffset %d to broker %d with fetcherId %d"
.format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index d4f15c1..b733fa3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -83,6 +83,8 @@ class ReplicaFetcherThread(name:String,
val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
if (leaderEndOffset < log.logEndOffset) {
log.truncateTo(leaderEndOffset)
+ warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d"
+ .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset))
leaderEndOffset
} else {
/**
@@ -93,6 +95,8 @@ class ReplicaFetcherThread(name:String,
*/
val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
log.truncateAndStartWithNewOffset(leaderStartOffset)
+ warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d"
+ .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset))
leaderStartOffset
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/756be536/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 68e712c..6d849ac 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -219,7 +219,7 @@ class ReplicaManager(val config: KafkaConfig,
if(requestedLeaderId == config.brokerId)
makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId)
else
- makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
+ makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.aliveLeaders,
leaderAndISRRequest.correlationId)
} catch {
case e =>
@@ -264,15 +264,14 @@ class ReplicaManager(val config: KafkaConfig,
}
private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int,
- partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker], correlationId: Int) {
+ partitionStateInfo: PartitionStateInfo, aliveLeaders: Set[Broker], correlationId: Int) {
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
- val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader
stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition [%s,%d]")
.format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
- if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers, correlationId)) {
+ if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, aliveLeaders, correlationId)) {
// remove this replica's partition from the ISR expiration queue
leaderPartitionsLock synchronized {
leaderPartitions -= partition