You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/11/01 18:28:18 UTC
git commit: KAFKA-1097 Race condition while reassigning low
throughput partition leads to incorrect ISR information in zookeeper;
reviewed by Jun Rao, Guozhang Wang
Updated Branches:
refs/heads/trunk e602ed058 -> df7f7a255
KAFKA-1097 Race condition while reassigning low throughput partition leads to incorrect ISR information in zookeeper; reviewed by Jun Rao, Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/df7f7a25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/df7f7a25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/df7f7a25
Branch: refs/heads/trunk
Commit: df7f7a255bdd803cddfdf3fea2ace4f1ae366377
Parents: e602ed0
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Nov 1 10:27:36 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Nov 1 10:27:36 2013 -0700
----------------------------------------------------------------------
.../kafka/admin/ReassignPartitionsCommand.scala | 14 +-
.../main/scala/kafka/cluster/Partition.scala | 49 +++++--
.../common/NotAssignedReplicaException.scala | 23 ++++
.../kafka/controller/KafkaController.scala | 127 +++++++++++++++----
.../kafka/controller/ReplicaStateMachine.scala | 2 +
.../scala/kafka/server/ReplicaManager.scala | 63 +++++----
kafka-patch-review.py | 6 +-
project/Build.scala | 2 +-
8 files changed, 214 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 2f706c9..70d1b81 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -104,10 +104,14 @@ object ReassignPartitionsCommand extends Logging {
}
}
} else if(options.has(topicsToMoveJsonFileOpt)) {
+ if(!options.has(brokerListOpt)) {
+ System.err.println("broker-list is required if topics-to-move-json-file is used")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
- val brokerList = options.valueOf(brokerListOpt)
+ val brokerListToReassign = options.valueOf(brokerListOpt).split(',').map(_.toInt)
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
- val brokerListToReassign = brokerList.split(',') map (_.toInt)
val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
@@ -117,7 +121,6 @@ object ReassignPartitionsCommand extends Logging {
topicInfo._2.head._2.size)
partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
}
-
} else if (options.has(manualAssignmentJsonFileOpt)) {
val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt)
val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile)
@@ -175,8 +178,11 @@ object ReassignPartitionsCommand extends Logging {
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition)
if(assignedReplicas == newReplicas)
ReassignmentCompleted
- else
+ else {
+ println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment (%s)" +
+ " for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","), topicAndPartition))
ReassignmentFailed
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index d8078bd..02ccc17 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -20,7 +20,7 @@ import scala.collection._
import kafka.admin.AdminUtils
import kafka.utils._
import java.lang.Object
-import kafka.api.LeaderAndIsr
+import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
import kafka.server.ReplicaManager
import com.yammer.metrics.core.Gauge
@@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
import org.apache.log4j.Logger
import kafka.message.ByteBufferMessageSet
-import kafka.common.{TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
+import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
/**
@@ -131,25 +131,34 @@ class Partition(val topic: String,
assignedReplicaMap.values.toSet
}
+ def removeReplica(replicaId: Int) {
+ assignedReplicaMap.remove(replicaId)
+ }
+
def getLeaderEpoch(): Int = {
leaderIsrUpdateLock synchronized {
return this.leaderEpoch
}
}
-
/**
* Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
* and setting the new leader and ISR
*/
- def makeLeader(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = {
+ def makeLeader(controllerId: Int,
+ partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
+ val allReplicas = partitionStateInfo.allReplicas
+ val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
-
+ // add replicas that are new
+ allReplicas.foreach(replica => getOrCreateReplica(replica))
val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
+ // remove assigned replicas that have been removed by the controller
+ (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
// reset LogEndOffset for remote replicas
assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
inSyncReplicas = newInSyncReplicas
@@ -165,16 +174,24 @@ class Partition(val topic: String,
/**
* Make the local replica the follower by setting the new leader and ISR to empty
*/
- def makeFollower(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, leaders: Set[Broker], correlationId: Int): Boolean = {
+ def makeFollower(controllerId: Int,
+ partitionStateInfo: PartitionStateInfo,
+ leaders: Set[Broker], correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
+ val allReplicas = partitionStateInfo.allReplicas
+ val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val newLeaderBrokerId: Int = leaderAndIsr.leader
+ // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
+ // to maintain the decision maker controller's epoch in the zookeeper path
+ controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
leaders.find(_.id == newLeaderBrokerId) match {
case Some(leaderBroker) =>
- // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
- // to maintain the decision maker controller's epoch in the zookeeper path
- controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+ // add replicas that are new
+ allReplicas.foreach(r => getOrCreateReplica(r))
+ // remove assigned replicas that have been removed by the controller
+ (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
@@ -192,7 +209,13 @@ class Partition(val topic: String,
def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
leaderIsrUpdateLock synchronized {
debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
- val replica = getOrCreateReplica(replicaId)
+ val replicaOpt = getReplica(replicaId)
+ if(!replicaOpt.isDefined) {
+ throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" +
+ " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
+ offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))
+ }
+ val replica = replicaOpt.get
replica.logEndOffset = offset
// check if this replica needs to be added to the ISR
@@ -200,7 +223,11 @@ class Partition(val topic: String,
case Some(leaderReplica) =>
val replica = getReplica(replicaId).get
val leaderHW = leaderReplica.highWatermark
- if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) {
+ // For a replica to get added back to ISR, it has to satisfy 3 conditions-
+ // 1. It is not already in the ISR
+ // 2. It is part of the assigned replica list. See KAFKA-1097
+ // 3. It's log end offset >= leader's highwatermark
+ if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) {
// expand ISR
val newInSyncReplicas = inSyncReplicas + replica
info("Expanding ISR for partition [%s,%d] from %s to %s"
http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala b/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala
new file mode 100644
index 0000000..409d112
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class NotAssignedReplicaException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+ def this(message: String) = this(message, null)
+ def this() = this(null, null)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 88d130f..88792c2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -35,6 +35,7 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import java.util.concurrent.atomic.AtomicInteger
import scala.Some
import kafka.common.TopicAndPartition
+import org.apache.log4j.Logger
class ControllerContext(val zkClient: ZkClient,
val zkSessionTimeout: Int,
@@ -105,6 +106,7 @@ object KafkaController extends Logging {
class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
+ private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
private val partitionStateMachine = new PartitionStateMachine(this)
private val replicaStateMachine = new ReplicaStateMachine(this)
@@ -359,17 +361,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* Reassigning replicas for a partition goes through a few stages -
* RAR = Reassigned replicas
* AR = Original list of replicas for partition
- * 1. Start new replicas RAR - AR.
- * 2. Wait until new replicas are in sync with the leader
- * 3. If the leader is not in RAR, elect a new leader from RAR
- * 4. Stop old replicas AR - RAR
- * 5. Write new AR
- * 6. Remove partition from the /admin/reassign_partitions path
+ * 1. Write new AR = AR + RAR
+ * 2. Start new replicas RAR - AR.
+ * 3. Wait until new replicas are in sync with the leader
+ * 4. If the leader is not in RAR, elect a new leader from RAR
+ * 5. Stop old replicas AR - RAR
+ * 6. Write new AR = RAR
+ * 7. Remove partition from the /admin/reassign_partitions path
*/
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
case true =>
+ val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
// mark the new replicas as online
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
@@ -378,9 +382,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// check if current leader is in the new replicas list. If not, controller needs to trigger leader election
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
// stop older replicas
- stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext)
+ stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
// write the new list of replicas for this partition in zookeeper
- updateAssignedReplicasForPartition(topicAndPartition, reassignedPartitionContext)
+ updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
// update the /admin/reassign_partitions path to remove this partition
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
@@ -390,8 +394,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
case false =>
info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned not yet caught up with the leader")
+ val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
+ val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
+ // write the expanded list of replicas to zookeeper
+ updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
+ // update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
+ updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
+ newAndOldReplicas.toSeq)
// start new replicas
- startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext)
+ startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned to catch up with the leader")
}
@@ -602,6 +613,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
+ // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr
+ // request to the current or new leader. This will prevent it from adding the old replicas to the ISR
+ val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
+ controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
@@ -613,6 +628,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
case true =>
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
+ // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
+ updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas)
case false =>
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
@@ -622,12 +639,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition,
- reassignedPartitionContext: ReassignedPartitionsContext) {
- val reassignedReplicas = reassignedPartitionContext.newReplicas
+ reassignedPartitionContext: ReassignedPartitionsContext,
+ oldReplicas: Set[Int]) {
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
- // send stop replica state change request to the old replicas
- val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
// first move the replica to offline state (the controller removes it from the ISR)
oldReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), OfflineReplica)
@@ -639,31 +654,44 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
- reassignedPartitionContext: ReassignedPartitionsContext) {
- val reassignedReplicas = reassignedPartitionContext.newReplicas
+ replicas: Seq[Int]) {
val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
- partitionsAndReplicasForThisTopic.put(topicAndPartition, reassignedReplicas)
+ partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas)
updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
- info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, reassignedReplicas.mkString(",")))
+ info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(",")))
// update the assigned replica list after a successful zookeeper write
- controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
- // stop watching the ISR changes for this partition
- zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
- controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
+ controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas)
}
private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
- reassignedPartitionContext: ReassignedPartitionsContext) {
+ reassignedPartitionContext: ReassignedPartitionsContext,
+ newReplicas: Set[Int]) {
// send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
// replicas list
- val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment(topicAndPartition)
- val reassignedReplicaSet = Set.empty[Int] ++ reassignedPartitionContext.newReplicas
- val newReplicas: Seq[Int] = (reassignedReplicaSet -- assignedReplicaSet).toSeq
newReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
}
}
+ private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
+ brokerRequestBatch.newBatch()
+ updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
+ case Some(updatedLeaderIsrAndControllerEpoch) =>
+ // send the shrunk assigned replica list to all the replicas, including the leader, so that it no longer
+ // allows old replicas to enter ISR
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
+ topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
+ brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement)
+ stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " +
+ "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch,
+ newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition))
+ case None => // fail the reassignment
+ stateChangeLogger.error(("Controller %d epoch %d failed to send LeaderAndIsr request with new assigned replica list %s " +
+ "to leader for partition being reassigned %s").format(config.brokerId, controllerContext.epoch,
+ newAssignedReplicas.mkString(","), topicAndPartition))
+ }
+ }
+
private def registerReassignedPartitionsListener() = {
zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this))
}
@@ -677,6 +705,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
+ // stop watching the ISR changes for this partition
+ zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+ controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
// read the current list of reassigned partitions from zookeeper
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
// remove this partition from that list
@@ -793,6 +824,52 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
finalLeaderIsrAndControllerEpoch
}
+ /**
+ * Does not change leader or isr, but just increments the leader epoch
+ * @param topic topic
+ * @param partition partition
+ * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
+ */
+ private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
+ val topicAndPartition = TopicAndPartition(topic, partition)
+ debug("Updating leader epoch for partition %s.".format(topicAndPartition))
+ var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
+ var zkWriteCompleteOrUnnecessary = false
+ while (!zkWriteCompleteOrUnnecessary) {
+ // refresh leader and isr from zookeeper again
+ val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+ zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
+ case Some(leaderIsrAndEpoch) =>
+ val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
+ val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
+ if(controllerEpoch > epoch)
+ throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+ "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
+ "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
+ // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded
+ // assigned replica list
+ val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
+ leaderAndIsr.isr, leaderAndIsr.zkVersion + 1)
+ // update the new leadership decision in zookeeper or retry
+ val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
+ zkClient,
+ ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+ ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
+ leaderAndIsr.zkVersion)
+ newLeaderAndIsr.zkVersion = newVersion
+ finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
+ if (updateSucceeded)
+ info("Updated leader epoch for partition %s to %d".format(topicAndPartition, newLeaderAndIsr.leaderEpoch))
+ updateSucceeded
+ case None =>
+ throw new IllegalStateException(("Cannot update leader epoch for partition %s as leaderAndIsr path is empty. " +
+ "This could mean we somehow tried to reassign a partition that doesn't exist").format(topicAndPartition))
+ true
+ }
+ }
+ finalLeaderIsrAndControllerEpoch
+ }
+
class SessionExpirationListener() extends IZkStateListener with Logging {
this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
@throws(classOf[Exception])
http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 212c05d..c52225a 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -165,6 +165,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
replicaState.put((topic, partition, replicaId), OnlineReplica)
case OfflineReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
+ // send stop replica command to the replica so that it stops fetching from the leader
+ brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty: Boolean =
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7b8f89e..161f581 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.util.concurrent.TimeUnit
import kafka.common._
-import kafka.api.{StopReplicaRequest, LeaderAndIsrRequest}
+import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
import org.apache.log4j.Logger
@@ -143,7 +143,7 @@ class ReplicaManager(val config: KafkaConfig,
controllerEpoch = stopReplicaRequest.controllerEpoch
val responseMap = new HashMap[(String, Int), Short]
// First stop fetchers for all partitions, then stop the corresponding replicas
- replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map{
+ replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map {
case (topic, partition) => TopicAndPartition(topic, partition)
})
for((topic, partitionId) <- stopReplicaRequest.partitions){
@@ -222,27 +222,27 @@ class ReplicaManager(val config: KafkaConfig,
controllerEpoch = leaderAndISRRequest.controllerEpoch
// First check partition's leader epoch
- val partitionleaderIsrAndControllerEpoch = new HashMap[Partition, LeaderIsrAndControllerEpoch]()
+ val partitionState = new HashMap[Partition, PartitionStateInfo]()
leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
val partitionLeaderEpoch = partition.getLeaderEpoch()
if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) {
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
- partitionleaderIsrAndControllerEpoch.put(partition, partitionStateInfo.leaderIsrAndControllerEpoch)
+ partitionState.put(partition, partitionStateInfo)
} else {
// Otherwise record the error code in response
stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " +
"controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d")
.format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch,
- partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition, partitionLeaderEpoch))
+ partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch))
responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
}
}
- val partitionsTobeLeader = partitionleaderIsrAndControllerEpoch
- .filter{ case (partition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
- val partitionsTobeFollower = (partitionleaderIsrAndControllerEpoch -- partitionsTobeLeader.keys)
+ val partitionsTobeLeader = partitionState
+ .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
+ val partitionsTobeFollower = (partitionState -- partitionsTobeLeader.keys)
if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap)
if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
@@ -271,28 +271,30 @@ class ReplicaManager(val config: KafkaConfig,
* the error message will be set on each partition since we do not know which partition caused it
* TODO: the above may need to be fixed later
*/
- private def makeLeaders(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch],
+ private def makeLeaders(controllerId: Int, epoch: Int,
+ partitionState: Map[Partition, PartitionStateInfo],
correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = {
stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-leader transition for partitions %s")
.format(localBrokerId, correlationId, controllerId, epoch,
- partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+ partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
- for (partition <- partitionLeaderISRAndControllerEpochs.keys)
+ for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
try {
// First stop fetchers for all the partitions
- replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_)))
+ replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d"
- .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId))
+ .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId))
// Update the partition information to be the leader
- partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeLeader(controllerId, leaderIsrAndControllerEpoch, correlationId)}
+ partitionState.foreach{ case (partition, partitionStateInfo) =>
+ partition.makeLeader(controllerId, partitionStateInfo, correlationId)}
// Finally add these partitions to the list of partitions for which the leader is the current broker
leaderPartitionsLock synchronized {
- leaderPartitions ++= partitionLeaderISRAndControllerEpochs.keySet
+ leaderPartitions ++= partitionState.keySet
}
} catch {
case e: Throwable =>
@@ -305,7 +307,8 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"for the become-leader transition for partitions %s")
- .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+ .format(localBrokerId, correlationId, controllerId, epoch,
+ partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
}
/*
@@ -325,30 +328,33 @@ class ReplicaManager(val config: KafkaConfig,
* the error message will be set on each partition since we do not know which partition caused it
* TODO: the above may need to be fixed later
*/
- private def makeFollowers(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch],
+ private def makeFollowers(controllerId: Int, epoch: Int,
+ partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) {
stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partitions %s")
.format(localBrokerId, correlationId, controllerId, epoch,
- partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+ partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
- for (partition <- partitionLeaderISRAndControllerEpochs.keys)
+ for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
try {
- replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_)))
+ replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d"
- .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId))
+ .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId))
- logManager.truncateTo(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) =>
+ logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) =>
new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark
})
stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries for partitions %s as per becoming-follower request from controller %d epoch %d"
- .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId))
+ .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId))
if (!isShuttingDown.get()) {
- replicaFetcherManager.addFetcherForPartitions(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) =>
- new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == leaderISRAndControllerEpoch.leaderAndIsr.leader).get, partition.getReplica().get.logEndOffset)}
+ replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) =>
+ new TopicAndPartition(partition) ->
+ BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get,
+ partition.getReplica().get.logEndOffset)}
)
}
else {
@@ -357,10 +363,11 @@ class ReplicaManager(val config: KafkaConfig,
.format(localBrokerId, correlationId, controllerId, epoch))
}
- partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
+ partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) =>
+ partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
leaderPartitionsLock synchronized {
- leaderPartitions --= partitionLeaderISRAndControllerEpochs.keySet
+ leaderPartitions --= partitionState.keySet
}
} catch {
case e: Throwable =>
@@ -373,7 +380,7 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"for the become-follower transition for partitions %s")
- .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+ .format(localBrokerId, correlationId, controllerId, epoch, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
}
private def maybeShrinkIsr(): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/kafka-patch-review.py
----------------------------------------------------------------------
diff --git a/kafka-patch-review.py b/kafka-patch-review.py
index daf2c35..7fa6cb5 100644
--- a/kafka-patch-review.py
+++ b/kafka-patch-review.py
@@ -95,12 +95,12 @@ def main():
comment="Created reviewboard "
if not opt.reviewboard:
- print 'Created a new reviewboard ',rb_url,' against branch ',opt.branch
+ print 'Created a new reviewboard ',rb_url,
else:
- print 'Updated reviewboard',opt.reviewboard
+ print 'Updated reviewboard'
comment="Updated reviewboard "
- comment = comment + rb_url
+ comment = comment + rb_url + ' against branch ' + opt.branch
jira.add_comment(opt.jira, comment)
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index bcd1ca5..40e0c4f 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -44,7 +44,7 @@ object KafkaBuild extends Build {
crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"),
excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"),
scalaVersion := "2.8.0",
- version := "0.8.0",
+ version := "0.8.1",
publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"),
credentials += Credentials(Path.userHome / ".m2" / ".credentials"),
buildNumber := System.getProperty("build.number", ""),