You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/10/31 00:27:36 UTC

git commit: KAFKA-1647; Create replicas on follower transition even if leader is unavailable, otherwise replication offset checkpoints (high water marks) can be lost on hard kills and restarts; reviewed by Joel Koshy, Neha Narkhede, Jun Rao and Guozhang

Repository: kafka
Updated Branches:
  refs/heads/trunk 89831204c -> 1ed9cf6d0


KAFKA-1647; Create replicas on follower transition even if leader is
unavailable, otherwise replication offset checkpoints (high water marks)
can be lost on hard kills and restarts; reviewed by Joel Koshy, Neha
Narkhede, Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ed9cf6d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ed9cf6d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ed9cf6d

Branch: refs/heads/trunk
Commit: 1ed9cf6d03603518d950f7e9a5f122c4ed5d7cee
Parents: 8983120
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu Oct 30 16:06:13 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Oct 30 16:06:21 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/ReplicaManager.scala | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1ed9cf6d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 02fa382..3007a6d 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -64,8 +64,8 @@ object ReplicaManager {
 }
 
 class ReplicaManager(val config: KafkaConfig,
-                     time: Time, 
-                     val zkClient: ZkClient, 
+                     time: Time,
+                     val zkClient: ZkClient,
                      scheduler: Scheduler,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
@@ -621,6 +621,7 @@ class ReplicaManager(val config: KafkaConfig,
         val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
         val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
         leaders.find(_.id == newLeaderBrokerId) match {
+          // Only change partition state when the leader is available
           case Some(leaderBroker) =>
             if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
               partitionsToMakeFollower += partition
@@ -632,10 +633,13 @@ class ReplicaManager(val config: KafkaConfig,
           case None =>
             // The leader broker should always be present in the leaderAndIsrRequest.
             // If not, we should record the error message and abort the transition process for this partition
-            stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
-              "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available")
+            stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
+              " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.")
               .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
               partition.topic, partition.partitionId, newLeaderBrokerId))
+            // Create the local replica even if the leader is unavailable. This is required to ensure that we include
+            // the partition's high watermark in the checkpoint file (see KAFKA-1647)
+            partition.getOrCreateReplica()
         }
       }