You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/25 17:32:11 UTC
git commit: kafka-813; Minor cleanup in Controller;
patched by Swapnil Ghike; reviewed by Neha Narkhede and Jun Rao
Updated Branches:
refs/heads/0.8 51421fcc0 -> 28ee78553
kafka-813; Minor cleanup in Controller; patched by Swapnil Ghike; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/28ee7855
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/28ee7855
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/28ee7855
Branch: refs/heads/0.8
Commit: 28ee7855360b8e6ea690fc802dac4eaa60ad81e2
Parents: 51421fc
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Mon Mar 25 09:31:57 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Mar 25 09:31:57 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/api/LeaderAndIsrRequest.scala | 7 +-
.../kafka/common/PartitionOfflineException.scala | 28 ---------
.../controller/ControllerChannelManager.scala | 6 +-
.../scala/kafka/controller/KafkaController.scala | 47 +++++++++-----
.../kafka/controller/PartitionLeaderSelector.scala | 44 +++++++++-----
.../kafka/controller/PartitionStateMachine.scala | 28 ++++-----
.../kafka/controller/ReplicaStateMachine.scala | 6 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 6 +-
.../api/RequestResponseSerializationTest.scala | 2 +-
.../unit/kafka/server/LeaderElectionTest.scala | 3 +-
11 files changed, 87 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index b40522d..3b7ee24 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -83,7 +83,6 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
object LeaderAndIsrRequest {
val CurrentVersion = 0.shortValue
- val DefaultClientId = ""
val IsInit: Boolean = true
val NotInit: Boolean = false
val DefaultAckTimeout: Int = 1000
@@ -126,9 +125,9 @@ case class LeaderAndIsrRequest (versionId: Short,
extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int,
- controllerEpoch: Int, correlationId: Int) = {
- this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
- controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
+ controllerEpoch: Int, correlationId: Int, clientId: String) = {
+ this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, LeaderAndIsrRequest.DefaultAckTimeout,
+ controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
}
def writeTo(buffer: ByteBuffer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/common/PartitionOfflineException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/PartitionOfflineException.scala b/core/src/main/scala/kafka/common/PartitionOfflineException.scala
deleted file mode 100644
index 3367708..0000000
--- a/core/src/main/scala/kafka/common/PartitionOfflineException.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-
-/**
- * This exception is thrown by the leader elector in the controller when leader election fails for a partition since
- * all the replicas for a partition are offline
- */
-class PartitionOfflineException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
- def this(message: String) = this(message, null)
- def this() = this(null, null)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 6e563d2..f7a7bd4 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -144,7 +144,8 @@ class RequestSendThread(val controllerId: Int,
}
}
-class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, controllerId: Int)
+class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit,
+ controllerId: Int, clientId: String)
extends Logging {
val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
@@ -190,7 +191,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
- val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId)
+ val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId,
+ clientId)
for (p <- partitionStateInfos) {
val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 25a8cfe..6e07096 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -46,7 +46,7 @@ class ControllerContext(val zkClient: ZkClient,
val correlationId: AtomicInteger = new AtomicInteger(0),
var allTopics: Set[String] = Set.empty,
var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
- var allLeaders: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
+ var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
new mutable.HashMap,
var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
@@ -87,10 +87,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
config.brokerId)
+ val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
- private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, config.brokerId)
+ private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, this.config.brokerId, this.clientId)
registerControllerChangedListener()
newGauge(
@@ -100,8 +101,21 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
)
+ newGauge(
+ "OfflinePartitionsCount",
+ new Gauge[Int] {
+ def getValue: Int = {
+ controllerContext.controllerLock synchronized {
+ controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader))
+ }
+ }
+ }
+ )
+
def epoch = controllerContext.epoch
+ def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
+
/**
* JMX operation to initiate clean shutdown of a broker. On clean shutdown,
* the controller first determines the partitions that the shutting down
@@ -137,8 +151,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
- trace("All leaders = " + controllerContext.allLeaders.mkString(","))
- controllerContext.allLeaders.filter {
+ trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+ controllerContext.partitionLeadershipInfo.filter {
case (topicAndPartition, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
}.map(_._1)
@@ -151,11 +165,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val (topic, partition) = topicAndPartition.asTuple
// move leadership serially to relinquish lock.
controllerContext.controllerLock synchronized {
- controllerContext.allLeaders.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
+ controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
- controlledShutdownPartitionLeaderSelector)
- val newLeaderIsrAndControllerEpoch = controllerContext.allLeaders(topicAndPartition)
+ controlledShutdownPartitionLeaderSelector)
+ val newLeaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(topicAndPartition)
// mark replica offline only if leadership was moved successfully
if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)
@@ -180,7 +194,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
allPartitionsAndReplicationFactorOnBroker foreach {
case(topicAndPartition, replicationFactor) =>
val (topic, partition) = topicAndPartition.asTuple
- if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) {
+ if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id) {
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
removeReplicaFromIsr(topic, partition, id) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
@@ -289,7 +303,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val deadBrokersSet = deadBrokers.toSet
// trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
- val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader =>
+ val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet
partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
// trigger OnlinePartition state changes for offline or new partitions
@@ -321,7 +335,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
info("New partition creation callback for %s".format(newPartitions.mkString(",")))
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica)
- partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition)
+ partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), OnlineReplica)
}
@@ -450,7 +464,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
- controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
+ controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
@@ -482,7 +496,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
// check if they are already completed
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition =>
- controllerContext.allLeaders(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head)
+ controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head)
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
@@ -502,7 +516,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
case true =>
- controllerContext.allLeaders.put(topicPartition, leaderIsrAndControllerEpoch)
+ controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
case false =>
debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderIsrAndControllerEpoch.leaderAndIsr.leader) +
"partition %s is dead, just ignore it".format(topicPartition))
@@ -522,7 +536,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
- val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
+ val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
@@ -626,7 +640,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
for(partition <- partitionsToBeRemoved) {
// check the status
- val currentLeader = controllerContext.allLeaders(partition).leaderAndIsr.leader
+ val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
if(currentLeader == preferredReplica) {
info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
@@ -965,7 +979,6 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
object ControllerStats extends KafkaMetricsGroup {
- val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS)
- val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
+ val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3ed9b7e..d295781 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -18,14 +18,14 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.utils.Logging
-import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
+import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
trait PartitionLeaderSelector {
/**
* @param topicAndPartition The topic and partition whose leader needs to be elected
* @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper
- * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
+ * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive
* @return The leader and isr request, with the newly selected leader info, to send to the brokers
* Also, returns the list of replicas the returned leader and isr request should be sent to
* This API selects a new leader for the input partition
@@ -38,7 +38,7 @@ trait PartitionLeaderSelector {
* This API selects a new leader for the input partition -
* 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
* 2. Else, it picks some alive broker from the assigned replica list as the new leader
- * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
+ * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException
* Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
*/
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
@@ -57,8 +57,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
.format(liveAssignedReplicasToThisPartition.mkString(",")))
liveAssignedReplicasToThisPartition.isEmpty match {
case true =>
- ControllerStats.offlinePartitionRate.mark()
- throw new PartitionOfflineException(("No replica for partition " +
+ throw new NoReplicaOnlineException(("No replica for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false =>
@@ -76,30 +75,31 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
(newLeaderAndIsr, liveAssignedReplicasToThisPartition)
case None =>
- ControllerStats.offlinePartitionRate.mark()
- throw new PartitionOfflineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it")
+ throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it")
}
}
}
/**
- * Picks one of the alive in-sync reassigned replicas as the new leader
+ * Picks one of the alive in-sync reassigned replicas as the new leader.
*/
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[ReassignedPartitionLeaderSelector]: "
+ /**
+ * The reassigned replicas are already in the ISR when selectLeader is called.
+ */
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
- val reassignedReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
+ val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
- // pick any replica from the newly assigned replicas list that is in the ISR
- val aliveReassignedReplicas = reassignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
- val newLeaderOpt = aliveReassignedReplicas.headOption
+ val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+ val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
newLeaderOpt match {
case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
- currentLeaderIsrZkPathVersion + 1), reassignedReplicas)
+ currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
case None =>
- reassignedReplicas.size match {
+ reassignedInSyncReplicas.size match {
case 0 =>
throw new StateChangeFailedException("List of reassigned replicas for partition " +
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
@@ -124,7 +124,7 @@ with Logging {
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
- val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
+ val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if(currentLeader == preferredReplica) {
throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s"
.format(preferredReplica, topicAndPartition))
@@ -177,6 +177,18 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}
-
}
+/**
+ * Essentially does nothing. Returns the current leader and ISR, and the current
+ * set of replicas assigned to a given topic/partition.
+ */
+class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
+
+ this.logIdent = "[NoOpLeaderSelector]: "
+
+ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+ warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
+ (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index b25e9f4..654fa2e 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -20,7 +20,7 @@ import collection._
import collection.JavaConversions._
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
-import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
+import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
import kafka.utils.{Logging, ZkUtils}
import org.I0Itec.zkclient.IZkChildListener
import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -43,9 +43,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
private val controllerId = controller.config.brokerId
private val zkClient = controllerContext.zkClient
var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
- val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId)
- val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
+ val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId)
private val isShuttingDown = new AtomicBoolean(false)
+ private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -86,7 +86,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
for((topicAndPartition, partitionState) <- partitionState) {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
- handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector)
+ handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
} catch {
@@ -101,7 +101,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* @param targetState The state that the partitions should be moved to
*/
def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
- leaderSelector: PartitionLeaderSelector = offlinePartitionSelector) {
+ leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector) {
info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
try {
brokerRequestBatch.newBatch()
@@ -111,6 +111,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
}catch {
case e => error("Error while moving some partitions to %s state".format(targetState), e)
+ // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
}
}
@@ -149,7 +150,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
case _ => // should never come here since illegal previous states are checked above
}
partitionState.put(topicAndPartition, OnlinePartition)
- val leader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
+ val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to OnlinePartition with leader %d"
.format(controllerId, controller.epoch, topicAndPartition, partitionState(topicAndPartition), leader))
// post: partition has a leader
@@ -172,7 +173,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} catch {
case t: Throwable =>
stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
- .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
+ .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
}
}
@@ -232,7 +233,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
liveAssignedReplicas.size match {
case 0 =>
- ControllerStats.offlinePartitionRate.mark()
val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +
"live brokers are [%s]. No assigned replica is alive.")
.format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
@@ -253,14 +253,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// GC pause
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
- controllerContext.allLeaders.put(topicAndPartition, leaderIsrAndControllerEpoch)
- partitionState.put(topicAndPartition, OnlinePartition)
+ controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
} catch {
case e: ZkNodeExistsException =>
// read the controller epoch
val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition).get
- ControllerStats.offlinePartitionRate.mark()
val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
"exists with value %s and controller epoch %d")
.format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)
@@ -310,22 +308,20 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
// update the leader cache
- controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
+ controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
.format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
// store new leader and isr info in cache
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
} catch {
- case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas %s for partition %s are dead."
- .format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(","), topicAndPartition) +
- " Marking this partition offline", poe)
+ case nroe: NoReplicaOnlineException => throw nroe
case sce =>
val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg, sce)
}
- debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
+ debug("After leader election, leader cache is updated to %s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
}
private def registerTopicChangeListener() = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 88058ec..5146f12 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -42,7 +42,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
private val controllerId = controller.config.brokerId
private val zkClient = controllerContext.zkClient
var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
- val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId)
+ val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId)
private val isShuttingDown = new AtomicBoolean(false)
this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -143,7 +143,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
.format(controllerId, controller.epoch, replicaId, topicAndPartition))
case _ =>
// check if the leader for this partition is alive or even exists
- controllerContext.allLeaders.get(topicAndPartition) match {
+ controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
case true => // leader is alive
@@ -163,7 +163,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty: Boolean =
- controllerContext.allLeaders.get(topicAndPartition) match {
+ controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(currLeaderIsrAndControllerEpoch) =>
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e7248c3..7298ccb 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -98,8 +98,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
*/
private def registerStats() {
BrokerTopicStats.getBrokerAllTopicsStats()
- ControllerStats.offlinePartitionRate
ControllerStats.uncleanLeaderElectionRate
+ ControllerStats.leaderElectionTimer
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 4dd1ba7..6c80c4c 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -381,7 +381,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
// assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
- assertEquals(2, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+ assertEquals(2, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
leaderBeforeShutdown = leaderAfterShutdown
controllerId = ZkUtils.getController(zkClient)
@@ -392,7 +392,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
// assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
- assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+ assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
leaderBeforeShutdown = leaderAfterShutdown
controllerId = ZkUtils.getController(zkClient)
@@ -402,7 +402,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
- assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+ assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
}
finally {
servers.foreach(_.shutdown())
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 02ff81f..0f15718 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -89,7 +89,7 @@ object SerializationTestUtils{
val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
- new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0)
+ new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0, "")
}
def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index ec1db2d..8f88177 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -133,7 +133,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
leaderAndIsr.put((topic, partitionId),
new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
- val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0)
+ val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId,
+ staleControllerEpoch, 0, "")
controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)