You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/04/11 17:36:15 UTC
[kafka] branch 1.1 updated: KAFKA-6752: Enable unclean leader
election metric (#4838)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 129859d KAFKA-6752: Enable unclean leader election metric (#4838)
129859d is described below
commit 129859d0f7bf6cfe394706d6b69a18be510b32d8
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Wed Apr 11 23:00:30 2018 +0530
KAFKA-6752: Enable unclean leader election metric (#4838)
Reviewers: Jun Rao <ju...@gmail.com>
---
.../kafka/controller/PartitionStateMachine.scala | 9 +++++---
.../PartitionLeaderElectionAlgorithmsTest.scala | 26 ++++++++++++++++------
.../integration/UncleanLeaderElectionTest.scala | 14 ++++++++++--
3 files changed, 37 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 2e27272..d760061 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig,
if (leaderIsrAndControllerEpochOpt.nonEmpty) {
val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
- val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled)
+ val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
val newLeaderAndIsrOpt = leaderOpt.map { leader =>
val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
else List(leader)
@@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig,
}
object PartitionLeaderElectionAlgorithms {
- def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = {
+ def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
if (uncleanLeaderElectionEnabled) {
- assignment.find(liveReplicas.contains)
+ val leaderOpt = assignment.find(liveReplicas.contains)
+ if (!leaderOpt.isEmpty)
+ controllerContext.stats.uncleanLeaderElectionRate.mark()
+ leaderOpt
} else {
None
}
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
index f149fc9..113a39d 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
@@ -17,10 +17,17 @@
package kafka.controller
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Before, Test}
import org.scalatest.junit.JUnitSuite
class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite {
+ private var controllerContext: ControllerContext = null
+
+ @Before
+ def setUp(): Unit = {
+ controllerContext = new ControllerContext
+ controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")
+ }
@Test
def testOfflinePartitionLeaderElection(): Unit = {
@@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite {
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
isr,
liveReplicas,
- uncleanLeaderElectionEnabled = false)
+ uncleanLeaderElectionEnabled = false,
+ controllerContext)
assertEquals(Option(4), leaderOpt)
}
@@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite {
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
isr,
liveReplicas,
- uncleanLeaderElectionEnabled = false)
+ uncleanLeaderElectionEnabled = false,
+ controllerContext)
assertEquals(None, leaderOpt)
+ assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count())
}
+
@Test
def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): Unit = {
val assignment = Seq(2, 4)
@@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite {
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
isr,
liveReplicas,
- uncleanLeaderElectionEnabled = true)
+ uncleanLeaderElectionEnabled = true,
+ controllerContext)
assertEquals(Option(4), leaderOpt)
+ assertEquals(1, controllerContext.stats.uncleanLeaderElectionRate.count())
}
@Test
@@ -62,10 +75,9 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite {
val reassignment = Seq(2, 4)
val isr = Seq(2, 4)
val liveReplicas = Set(4)
- val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(reassignment,
+ val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment,
isr,
- liveReplicas,
- uncleanLeaderElectionEnabled = false)
+ liveReplicas)
assertEquals(Option(4), leaderOpt)
}
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 5269f92..608f3a6 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -191,12 +191,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
produceMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic))
+ //remove any previous unclean election metric
+ servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
// shutdown leader and then restart follower
servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
- servers.filter(server => server.config.brokerId == followerId).map(server => server.startup())
+ val followerServer = servers.find(_.config.brokerId == followerId).get
+ followerServer.startup()
// wait until new leader is (uncleanly) elected
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
+ assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
produceMessage(servers, topic, "third")
@@ -224,12 +229,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
produceMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic))
+ //remove any previous unclean election metric
+ servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
// shutdown leader and then restart follower
servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
- servers.filter(server => server.config.brokerId == followerId).map(server => server.startup())
+ val followerServer = servers.find(_.config.brokerId == followerId).get
+ followerServer.startup()
// verify that unclean election to non-ISR follower does not occur
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(-1))
+ assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
// message production and consumption should both fail while leader is down
try {
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.