You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/05/01 14:27:12 UTC
[kafka] branch trunk updated: KAFKA-6526: Enable unclean leader
election without controller change (#4920)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4d8552 KAFKA-6526: Enable unclean leader election without controller change (#4920)
b4d8552 is described below
commit b4d8552218c9ab41bc1e0221c4e79417a2662d19
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue May 1 15:27:02 2018 +0100
KAFKA-6526: Enable unclean leader election without controller change (#4920)
Enable dynamic update of default unclean leader election config of brokers. A new controller event has been added to process unclean leader election when the config is enabled dynamically.
Reviewers: Dong Lin <li...@gmail.com>, Manikumar Reddy <ma...@gmail.com>
---
.../scala/kafka/controller/ControllerState.scala | 6 ++-
.../scala/kafka/controller/KafkaController.scala | 14 +++++
.../scala/kafka/server/DynamicBrokerConfig.scala | 8 ++-
.../server/DynamicBrokerReconfigurationTest.scala | 62 +++++++++++++++++++++-
4 files changed, 85 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala
index 17af777..d247305 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -90,7 +90,11 @@ object ControllerState {
def value = 12
}
+ case object UncleanLeaderElectionEnable extends ControllerState {
+ def value = 13
+ }
+
val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
- LogDirChange, ControllerShutdown)
+ LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable)
}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index eee625e..bc721e39 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -195,6 +195,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
zkClient.updateBrokerInfoInZk(newBrokerInfo)
}
+ private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
+ eventManager.put(UncleanLeaderElectionEnable)
+ }
+
private def state: ControllerState = eventManager.state
/**
@@ -1009,6 +1013,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
+ case object UncleanLeaderElectionEnable extends ControllerEvent {
+
+ def state = ControllerState.UncleanLeaderElectionEnable
+
+ override def process(): Unit = {
+ if (!isActive) return
+ partitionStateMachine.triggerOnlinePartitionStateChange()
+ }
+ }
+
case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
def state = ControllerState.ControlledShutdown
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index be0ed6b..004b531 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -156,7 +156,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
- addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
+ addReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
@@ -501,7 +501,7 @@ object DynamicLogConfig {
val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- ExcludedConfigs
val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case (k, v) => (v, k) }
}
-class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Logging {
+class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Reconfigurable with Logging {
override def configure(configs: util.Map[String, _]): Unit = {}
@@ -517,6 +517,7 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
override def reconfigure(configs: util.Map[String, _]): Unit = {
val currentLogConfig = logManager.currentDefaultConfig
+ val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) =>
if (v != null) {
@@ -536,6 +537,9 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
val logConfig = LogConfig(props.asJava)
log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
}
+ if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable) {
+ server.kafkaController.enableDefaultUncleanLeaderElection()
+ }
}
}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 3c6c039..8b70875 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -43,7 +43,7 @@ import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition}
+import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.SslConfigs._
import org.apache.kafka.common.config.types.Password
@@ -430,6 +430,62 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
@Test
+ def testUncleanLeaderElectionEnable(): Unit = {
+ val topic = "testtopic2"
+ TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers)
+ val producer = ProducerBuilder().maxRetries(1000).acks(1).build()
+ val consumer = ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build()
+ verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+ consumer.commitSync()
+
+ def partitionInfo: TopicPartitionInfo =
+ adminClients.head.describeTopics(Collections.singleton(topic)).values.get(topic).get().partitions().get(0)
+
+ val partitionInfo0 = partitionInfo
+ assertEquals(partitionInfo0.replicas.get(0), partitionInfo0.leader)
+ val leaderBroker = servers.find(_.config.brokerId == partitionInfo0.replicas.get(0).id).get
+ val followerBroker = servers.find(_.config.brokerId == partitionInfo0.replicas.get(1).id).get
+
+ // Stop follower
+ followerBroker.shutdown()
+ followerBroker.awaitShutdown()
+
+ // Produce and consume some messages when the only follower is down, this should succeed since MinIsr is 1
+ verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+ consumer.commitSync()
+
+ // Shutdown leader and startup follower
+ leaderBroker.shutdown()
+ leaderBroker.awaitShutdown()
+ followerBroker.startup()
+ val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+
+ // Verify that new leader is not elected with unclean leader disabled since there are no ISRs
+ TestUtils.waitUntilTrue(() => partitionInfo.leader == null, "Unclean leader elected")
+
+ // Enable unclean leader election
+ val newProps = new Properties
+ newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+ TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get
+ waitForConfigOnServer(controller, KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+
+ // Verify that the old follower with missing records is elected as the new leader
+ val (newLeader, elected) = TestUtils.computeUntilTrue(partitionInfo.leader)(leader => leader != null)
+ assertTrue("Unclean leader not elected", elected)
+ assertEquals(followerBroker.config.brokerId, newLeader.id)
+
+ // New leader doesn't have the last 10 records committed on the old leader that have already been consumed.
+ // With unclean leader election enabled, we should be able to produce to the new leader. The first 10 records
+ // produced will not be consumed since they have offsets less than the consumer's committed offset.
+ // Next 10 records produced should be consumed.
+ (1 to 10).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
+ .map(producer.send)
+ .map(_.get(10, TimeUnit.SECONDS))
+ verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+ consumer.commitSync()
+ }
+
+ @Test
def testThreadPoolResize(): Unit = {
val requestHandlerPrefix = "kafka-request-handler-"
val networkThreadPrefix = "kafka-network-thread-"
@@ -1272,12 +1328,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]] {
private var _retries = 0
+ private var _acks = -1
def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this }
+ def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
override def build(): KafkaProducer[String, String] = {
val producer = TestUtils.createNewProducer(bootstrapServers,
- acks = -1,
+ acks = _acks,
retries = _retries,
securityProtocol = _securityProtocol,
trustStoreFile = Some(trustStoreFile1),
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.