You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/10/31 00:27:36 UTC
git commit: KAFKA-1647;
Create replicas on follower transition even if leader is unavailable,
otherwise replication offset checkpoints (high water marks) can be lost on
hard kills and restarts;
reviewed by Joel Koshy, Neha Narkhede, Jun Rao and Guozhang
Repository: kafka
Updated Branches:
refs/heads/trunk 89831204c -> 1ed9cf6d0
KAFKA-1647; Create replicas on follower transition even if leader is
unavailable, otherwise replication offset checkpoints (high water marks)
can be lost on hard kills and restarts; reviewed by Joel Koshy, Neha
Narkhede, Jun Rao and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ed9cf6d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ed9cf6d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ed9cf6d
Branch: refs/heads/trunk
Commit: 1ed9cf6d03603518d950f7e9a5f122c4ed5d7cee
Parents: 8983120
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu Oct 30 16:06:13 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Oct 30 16:06:21 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/server/ReplicaManager.scala | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ed9cf6d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 02fa382..3007a6d 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -64,8 +64,8 @@ object ReplicaManager {
}
class ReplicaManager(val config: KafkaConfig,
- time: Time,
- val zkClient: ZkClient,
+ time: Time,
+ val zkClient: ZkClient,
scheduler: Scheduler,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
@@ -621,6 +621,7 @@ class ReplicaManager(val config: KafkaConfig,
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
leaders.find(_.id == newLeaderBrokerId) match {
+ // Only change partition state when the leader is available
case Some(leaderBroker) =>
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
partitionsToMakeFollower += partition
@@ -632,10 +633,13 @@ class ReplicaManager(val config: KafkaConfig,
case None =>
// The leader broker should always be present in the leaderAndIsrRequest.
// If not, we should record the error message and abort the transition process for this partition
- stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
- "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available")
+ stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
+ " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.")
.format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
partition.topic, partition.partitionId, newLeaderBrokerId))
+ // Create the local replica even if the leader is unavailable. This is required to ensure that we include
+ // the partition's high watermark in the checkpoint file (see KAFKA-1647)
+ partition.getOrCreateReplica()
}
}