You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:05 UTC
[20/36] git commit: KAFKA-999 Controlled shutdown never succeeds
until the broker is killed; reviewed by Neha Narkhede
KAFKA-999 Controlled shutdown never succeeds until the broker is killed; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9753d15e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9753d15e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9753d15e
Branch: refs/heads/trunk
Commit: 9753d15ef1da0ecab9bbee51d8cdc08c2159c50b
Parents: 1d6ad3d
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Tue Aug 6 20:41:59 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Aug 6 20:42:08 2013 -0700
----------------------------------------------------------------------
.../scala/kafka/api/LeaderAndIsrRequest.scala | 14 ++++++-------
.../main/scala/kafka/cluster/Partition.scala | 21 ++++++++++----------
.../controller/ControllerChannelManager.scala | 7 +++----
.../kafka/controller/KafkaController.scala | 4 ++--
.../controller/PartitionStateMachine.scala | 4 ++--
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../scala/kafka/server/ReplicaManager.scala | 6 +++---
7 files changed, 29 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index a474474..981d2bb 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -135,13 +135,13 @@ case class LeaderAndIsrRequest (versionId: Short,
controllerId: Int,
controllerEpoch: Int,
partitionStateInfos: Map[(String, Int), PartitionStateInfo],
- aliveLeaders: Set[Broker])
+ leaders: Set[Broker])
extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
- def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], aliveLeaders: Set[Broker], controllerId: Int,
+ def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int,
controllerEpoch: Int, correlationId: Int, clientId: String) = {
this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId,
- controllerId, controllerEpoch, partitionStateInfos, aliveLeaders)
+ controllerId, controllerEpoch, partitionStateInfos, leaders)
}
def writeTo(buffer: ByteBuffer) {
@@ -156,8 +156,8 @@ case class LeaderAndIsrRequest (versionId: Short,
buffer.putInt(key._2)
value.writeTo(buffer)
}
- buffer.putInt(aliveLeaders.size)
- aliveLeaders.foreach(_.writeTo(buffer))
+ buffer.putInt(leaders.size)
+ leaders.foreach(_.writeTo(buffer))
}
def sizeInBytes(): Int = {
@@ -171,7 +171,7 @@ case class LeaderAndIsrRequest (versionId: Short,
for((key, value) <- partitionStateInfos)
size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
size += 4 /* number of leader brokers */
- for(broker <- aliveLeaders)
+ for(broker <- leaders)
size += broker.sizeInBytes /* broker info */
size
}
@@ -185,7 +185,7 @@ case class LeaderAndIsrRequest (versionId: Short,
leaderAndIsrRequest.append(";CorrelationId:" + correlationId)
leaderAndIsrRequest.append(";ClientId:" + clientId)
leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
- leaderAndIsrRequest.append(";Leaders:" + aliveLeaders.mkString(","))
+ leaderAndIsrRequest.append(";Leaders:" + leaders.mkString(","))
leaderAndIsrRequest.toString()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 88fc8dd..a9bb3c8 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -167,7 +167,7 @@ class Partition(val topic: String,
* 4. start a fetcher to the new leader
*/
def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
- aliveLeaders: Set[Broker], correlationId: Int): Boolean = {
+ leaders: Set[Broker], correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
@@ -186,7 +186,8 @@ class Partition(val topic: String,
// on the leader
val localReplica = getOrCreateReplica()
val newLeaderBrokerId: Int = leaderAndIsr.leader
- aliveLeaders.find(_.id == newLeaderBrokerId) match {
+ // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
+ leaders.find(_.id == newLeaderBrokerId) match {
case Some(leaderBroker) =>
// stop fetcher thread to previous leader
replicaFetcherManager.removeFetcher(topic, partitionId)
@@ -200,15 +201,15 @@ class Partition(val topic: String,
replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
}
else {
- stateChangeLogger.trace("Broker %d ignored the become-follower state change with correlation id %d from " +
- " controller %d epoch %d since it is shutting down"
- .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch))
+ stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " +
+ "controller %d epoch %d since it is shutting down")
+ .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch))
}
- case None => // leader went down
- stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " +
- " controller %d epoch %d since leader %d for partition [%s,%d] is unavailable during the state change operation"
- .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
- newLeaderBrokerId, topic, partitionId))
+ case None => // we should not come here
+ stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
+ "controller %d epoch %d for partition [%s,%d] new leader %d")
+ .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+ topic, partitionId, newLeaderBrokerId))
}
true
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 38b8674..ed1ce0b 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -229,14 +229,13 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
}
}
- def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int, liveBrokers: Set[Broker]) {
+ def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {
leaderAndIsrRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
- val aliveLeaders = liveBrokers.filter(b => leaderIds.contains(b.id))
- val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, aliveLeaders, controllerId, controllerEpoch, correlationId,
- clientId)
+ val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
+ val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId)
for (p <- partitionStateInfos) {
val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b07e27b..c87caab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -168,7 +168,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// before which the stop replica request should be completed (in most cases)
brokerRequestBatch.newBatch()
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false)
- brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+ brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
// If the broker is a follower, updates the isr in ZK and notifies the current leader
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
@@ -656,7 +656,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
brokerRequestBatch.newBatch()
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
- brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+ brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 0135d45..a084830 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -89,7 +89,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector)
}
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+ brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
} catch {
case e => error("Error while moving some partitions to the online state", e)
// TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
@@ -109,7 +109,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
partitions.foreach { topicAndPartition =>
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
}
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+ brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch {
case e => error("Error while moving some partitions to %s state".format(targetState), e)
// TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 0c9d436..9f752f4 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -87,7 +87,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
try {
brokerRequestBatch.newBatch()
replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+ brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch {
case e => error("Error while moving some replicas to %s state".format(targetState), e)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d885ba1..f551243 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -220,7 +220,7 @@ class ReplicaManager(val config: KafkaConfig,
if(requestedLeaderId == config.brokerId)
makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId)
else
- makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.aliveLeaders,
+ makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
leaderAndISRRequest.correlationId)
} catch {
case e =>
@@ -264,14 +264,14 @@ class ReplicaManager(val config: KafkaConfig,
}
private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int,
- partitionStateInfo: PartitionStateInfo, aliveLeaders: Set[Broker], correlationId: Int) {
+ partitionStateInfo: PartitionStateInfo, leaders: Set[Broker], correlationId: Int) {
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition [%s,%d]")
.format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
- if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, aliveLeaders, correlationId)) {
+ if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, leaders, correlationId)) {
// remove this replica's partition from the ISR expiration queue
leaderPartitionsLock synchronized {
leaderPartitions -= partition