You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/05/08 17:45:50 UTC

kafka git commit: KAFKA-3670; ControlledShutdownLeaderSelector should pick the preferred replica as the new leader, if possible

Repository: kafka
Updated Branches:
  refs/heads/trunk 8fe255223 -> 51f7a35c9


KAFKA-3670; ControlledShutdownLeaderSelector should pick the preferred replica as the new leader, if possible

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #1338 from ijuma/kafka-3670-controlled-shutdown-leader-selector-preferred-replica


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51f7a35c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51f7a35c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51f7a35c

Branch: refs/heads/trunk
Commit: 51f7a35c929d9aa04d821098a2266902f9178d7c
Parents: 8fe2552
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sun May 8 10:45:47 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun May 8 10:45:47 2016 -0700

----------------------------------------------------------------------
 .../controller/PartitionLeaderSelector.scala    |  9 +--
 .../ControlledShutdownLeaderSelectorTest.scala  | 73 ++++++++++++++++++++
 2 files changed, 76 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/51f7a35c/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 5eed382..9d8b0b6 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -185,13 +185,10 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
     val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
 
     val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
-    val newLeaderOpt = newIsr.headOption
-    newLeaderOpt match {
+    liveAssignedReplicas.filter(newIsr.contains).headOption match {
       case Some(newLeader) =>
-        debug("Partition %s : current leader = %d, new leader = %d"
-              .format(topicAndPartition, currentLeader, newLeader))
-        (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
-         liveAssignedReplicas)
+        debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
+        (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
       case None =>
         throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
           " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/51f7a35c/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
new file mode 100644
index 0000000..f032eb6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
@@ -0,0 +1,73 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.controller.{ControlledShutdownLeaderSelector, ControllerContext}
+import org.easymock.EasyMock
+import org.junit.{Assert, Test}
+import Assert._
+import kafka.cluster.Broker
+import kafka.utils.ZkUtils
+
+import scala.collection.mutable
+
+class ControlledShutdownLeaderSelectorTest {
+
+  @Test
+  def testSelectLeader() {
+    val topicPartition = TopicAndPartition("topic", 1)
+    val assignment = Seq(6, 5, 4, 3, 2, 1)
+    val preferredReplicaId = assignment.head
+
+    val firstIsr = List(1, 3, 6)
+    val firstLeader = 1
+
+    val zkUtils = EasyMock.mock(classOf[ZkUtils])
+    val controllerContext = new ControllerContext(zkUtils, zkSessionTimeout = 1000)
+    controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet
+    controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3)
+    controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)
+
+    val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
+    val firstLeaderAndIsr = new LeaderAndIsr(firstLeader, firstIsr)
+    val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition, firstLeaderAndIsr)
+
+    assertEquals(preferredReplicaId, secondLeaderAndIsr.leader)
+    assertEquals(Seq(1, 6), secondLeaderAndIsr.isr)
+    assertEquals(1, secondLeaderAndIsr.zkVersion)
+    assertEquals(1, secondLeaderAndIsr.leaderEpoch)
+    assertEquals(assignment, secondReplicas)
+
+    controllerContext.shuttingDownBrokerIds += preferredReplicaId
+
+    val deadBrokerId = 2
+    controllerContext.liveBrokers = controllerContext.liveOrShuttingDownBrokers.filter(_.id != deadBrokerId)
+    controllerContext.shuttingDownBrokerIds -= deadBrokerId
+
+    val (thirdLeaderAndIsr, thirdReplicas) = leaderSelector.selectLeader(topicPartition, secondLeaderAndIsr)
+
+    assertEquals(1, thirdLeaderAndIsr.leader)
+    assertEquals(Seq(1), thirdLeaderAndIsr.isr)
+    assertEquals(2, thirdLeaderAndIsr.zkVersion)
+    assertEquals(2, thirdLeaderAndIsr.leaderEpoch)
+    assertEquals(Seq(6, 5, 4, 3, 1), thirdReplicas)
+
+  }
+
+}