You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/11/29 18:29:13 UTC
git commit: kafka-1152;
ReplicaManager's handling of the leaderAndIsrRequest should
gracefully handle leader == -1; patched by Swapnil Ghike; reviewed by Jun Rao
Updated Branches:
refs/heads/trunk df288b75a -> 224f192c7
kafka-1152; ReplicaManager's handling of the leaderAndIsrRequest should gracefully handle leader == -1; patched by Swapnil Ghike; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/224f192c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/224f192c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/224f192c
Branch: refs/heads/trunk
Commit: 224f192c776b4348596fbf068771c98aa9db9f3d
Parents: df288b7
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Fri Nov 29 09:29:56 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 29 09:29:56 2013 -0800
----------------------------------------------------------------------
.../scala/kafka/server/ReplicaManager.scala | 24 ++++++++++++++------
1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/224f192c/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 161f581..54f6e16 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -328,8 +328,7 @@ class ReplicaManager(val config: KafkaConfig,
* the error message will be set on each partition since we do not know which partition caused it
* TODO: the above may need to be fixed later
*/
- private def makeFollowers(controllerId: Int, epoch: Int,
- partitionState: Map[Partition, PartitionStateInfo],
+ private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) {
stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partitions %s")
@@ -351,11 +350,22 @@ class ReplicaManager(val config: KafkaConfig,
.format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId))
if (!isShuttingDown.get()) {
- replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) =>
- new TopicAndPartition(partition) ->
- BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get,
- partition.getReplica().get.logEndOffset)}
- )
+ val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]()
+ partitionState.foreach {
+ case (partition, partitionStateInfo) =>
+ val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+ leaders.find(_.id == leader) match {
+ case Some(leaderBroker) =>
+ partitionAndOffsets.put(new TopicAndPartition(partition),
+ BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset))
+ case None =>
+ stateChangeLogger.trace("Broker %d ignored the become-follower state change with correlation id %d " +
+ "controller %d epoch %d for topic-partition %s since the designated leader %d " +
+ "cannot be found in live or shutting down brokers %s"
+ .format(localBrokerId, correlationId, controllerId, epoch, partition, leader, leaders))
+ }
+ }
+ replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
}
else {
stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " +