You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/05/08 17:45:50 UTC
kafka git commit: KAFKA-3670;
ControlledShutdownLeaderSelector should pick the preferred replica as
the new leader, if possible
Repository: kafka
Updated Branches:
refs/heads/trunk 8fe255223 -> 51f7a35c9
KAFKA-3670; ControlledShutdownLeaderSelector should pick the preferred replica as the new leader, if possible
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #1338 from ijuma/kafka-3670-controlled-shutdown-leader-selector-preferred-replica
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51f7a35c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51f7a35c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51f7a35c
Branch: refs/heads/trunk
Commit: 51f7a35c929d9aa04d821098a2266902f9178d7c
Parents: 8fe2552
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sun May 8 10:45:47 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun May 8 10:45:47 2016 -0700
----------------------------------------------------------------------
.../controller/PartitionLeaderSelector.scala | 9 +--
.../ControlledShutdownLeaderSelectorTest.scala | 73 ++++++++++++++++++++
2 files changed, 76 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/51f7a35c/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 5eed382..9d8b0b6 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -185,13 +185,10 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
- val newLeaderOpt = newIsr.headOption
- newLeaderOpt match {
+ liveAssignedReplicas.filter(newIsr.contains).headOption match {
case Some(newLeader) =>
- debug("Partition %s : current leader = %d, new leader = %d"
- .format(topicAndPartition, currentLeader, newLeader))
- (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
- liveAssignedReplicas)
+ debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
+ (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
case None =>
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
http://git-wip-us.apache.org/repos/asf/kafka/blob/51f7a35c/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
new file mode 100644
index 0000000..f032eb6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.controller.{ControlledShutdownLeaderSelector, ControllerContext}
+import org.easymock.EasyMock
+import org.junit.{Assert, Test}
+import Assert._
+import kafka.cluster.Broker
+import kafka.utils.ZkUtils
+
+import scala.collection.mutable
+
+class ControlledShutdownLeaderSelectorTest {
+
+ @Test
+ def testSelectLeader() {
+ val topicPartition = TopicAndPartition("topic", 1)
+ val assignment = Seq(6, 5, 4, 3, 2, 1)
+ val preferredReplicaId = assignment.head
+
+ val firstIsr = List(1, 3, 6)
+ val firstLeader = 1
+
+ val zkUtils = EasyMock.mock(classOf[ZkUtils])
+ val controllerContext = new ControllerContext(zkUtils, zkSessionTimeout = 1000)
+ controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet
+ controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3)
+ controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)
+
+ val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
+ val firstLeaderAndIsr = new LeaderAndIsr(firstLeader, firstIsr)
+ val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition, firstLeaderAndIsr)
+
+ assertEquals(preferredReplicaId, secondLeaderAndIsr.leader)
+ assertEquals(Seq(1, 6), secondLeaderAndIsr.isr)
+ assertEquals(1, secondLeaderAndIsr.zkVersion)
+ assertEquals(1, secondLeaderAndIsr.leaderEpoch)
+ assertEquals(assignment, secondReplicas)
+
+ controllerContext.shuttingDownBrokerIds += preferredReplicaId
+
+ val deadBrokerId = 2
+ controllerContext.liveBrokers = controllerContext.liveOrShuttingDownBrokers.filter(_.id != deadBrokerId)
+ controllerContext.shuttingDownBrokerIds -= deadBrokerId
+
+ val (thirdLeaderAndIsr, thirdReplicas) = leaderSelector.selectLeader(topicPartition, secondLeaderAndIsr)
+
+ assertEquals(1, thirdLeaderAndIsr.leader)
+ assertEquals(Seq(1), thirdLeaderAndIsr.isr)
+ assertEquals(2, thirdLeaderAndIsr.zkVersion)
+ assertEquals(2, thirdLeaderAndIsr.leaderEpoch)
+ assertEquals(Seq(6, 5, 4, 3, 1), thirdReplicas)
+
+ }
+
+}