You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/05/01 14:27:12 UTC

[kafka] branch trunk updated: KAFKA-6526: Enable unclean leader election without controller change (#4920)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4d8552  KAFKA-6526: Enable unclean leader election without controller change (#4920)
b4d8552 is described below

commit b4d8552218c9ab41bc1e0221c4e79417a2662d19
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue May 1 15:27:02 2018 +0100

    KAFKA-6526: Enable unclean leader election without controller change (#4920)
    
    Enable dynamic update of default unclean leader election config of brokers. A new controller event has been added to process unclean leader election when the config is enabled dynamically.
    
    Reviewers: Dong Lin <li...@gmail.com>, Manikumar Reddy <ma...@gmail.com>
---
 .../scala/kafka/controller/ControllerState.scala   |  6 ++-
 .../scala/kafka/controller/KafkaController.scala   | 14 +++++
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  8 ++-
 .../server/DynamicBrokerReconfigurationTest.scala  | 62 +++++++++++++++++++++-
 4 files changed, 85 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala
index 17af777..d247305 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -90,7 +90,11 @@ object ControllerState {
     def value = 12
   }
 
+  case object UncleanLeaderElectionEnable extends ControllerState {
+    def value = 13
+  }
+
   val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
     PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
-    LogDirChange, ControllerShutdown)
+    LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable)
 }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index eee625e..bc721e39 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -195,6 +195,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     zkClient.updateBrokerInfoInZk(newBrokerInfo)
   }
 
+  private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
+    eventManager.put(UncleanLeaderElectionEnable)
+  }
+
   private def state: ControllerState = eventManager.state
 
   /**
@@ -1009,6 +1013,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
+  case object UncleanLeaderElectionEnable extends ControllerEvent {
+
+    def state = ControllerState.UncleanLeaderElectionEnable
+
+    override def process(): Unit = {
+      if (!isActive) return
+      partitionStateMachine.triggerOnlinePartitionStateChange()
+    }
+  }
+
   case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
 
     def state = ControllerState.ControlledShutdown
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index be0ed6b..004b531 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -156,7 +156,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
     if (kafkaServer.logManager.cleaner != null)
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
-    addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
+    addReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
     addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
     addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
@@ -501,7 +501,7 @@ object DynamicLogConfig {
   val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- ExcludedConfigs
   val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case (k, v) => (v, k) }
 }
-class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Logging {
+class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Reconfigurable with Logging {
 
   override def configure(configs: util.Map[String, _]): Unit = {}
 
@@ -517,6 +517,7 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
 
   override def reconfigure(configs: util.Map[String, _]): Unit = {
     val currentLogConfig = logManager.currentDefaultConfig
+    val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
     val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
     configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) =>
       if (v != null) {
@@ -536,6 +537,9 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
       val logConfig = LogConfig(props.asJava)
       log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
     }
+    if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable) {
+      server.kafkaController.enableDefaultUncleanLeaderElection()
+    }
   }
 }
 
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 3c6c039..8b70875 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -43,7 +43,7 @@ import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition}
+import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.config.SslConfigs._
 import org.apache.kafka.common.config.types.Password
@@ -430,6 +430,62 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   }
 
   @Test
+  def testUncleanLeaderElectionEnable(): Unit = {
+    val topic = "testtopic2"
+    TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers)
+    val producer = ProducerBuilder().maxRetries(1000).acks(1).build()
+    val consumer = ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build()
+    verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+    consumer.commitSync()
+
+    def partitionInfo: TopicPartitionInfo =
+      adminClients.head.describeTopics(Collections.singleton(topic)).values.get(topic).get().partitions().get(0)
+
+    val partitionInfo0 = partitionInfo
+    assertEquals(partitionInfo0.replicas.get(0), partitionInfo0.leader)
+    val leaderBroker = servers.find(_.config.brokerId == partitionInfo0.replicas.get(0).id).get
+    val followerBroker = servers.find(_.config.brokerId == partitionInfo0.replicas.get(1).id).get
+
+    // Stop follower
+    followerBroker.shutdown()
+    followerBroker.awaitShutdown()
+
+    // Produce and consume some messages when the only follower is down, this should succeed since MinIsr is 1
+    verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+    consumer.commitSync()
+
+    // Shutdown leader and startup follower
+    leaderBroker.shutdown()
+    leaderBroker.awaitShutdown()
+    followerBroker.startup()
+    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+
+    // Verify that new leader is not elected with unclean leader disabled since there are no ISRs
+    TestUtils.waitUntilTrue(() => partitionInfo.leader == null, "Unclean leader elected")
+
+    // Enable unclean leader election
+    val newProps = new Properties
+    newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+    TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get
+    waitForConfigOnServer(controller, KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+
+    // Verify that the old follower with missing records is elected as the new leader
+    val (newLeader, elected) = TestUtils.computeUntilTrue(partitionInfo.leader)(leader => leader != null)
+    assertTrue("Unclean leader not elected", elected)
+    assertEquals(followerBroker.config.brokerId, newLeader.id)
+
+    // New leader doesn't have the last 10 records committed on the old leader that have already been consumed.
+    // With unclean leader election enabled, we should be able to produce to the new leader. The first 10 records
+    // produced will not be consumed since they have offsets less than the consumer's committed offset.
+    // Next 10 records produced should be consumed.
+    (1 to 10).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
+      .map(producer.send)
+      .map(_.get(10, TimeUnit.SECONDS))
+    verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+    consumer.commitSync()
+  }
+
+  @Test
   def testThreadPoolResize(): Unit = {
     val requestHandlerPrefix = "kafka-request-handler-"
     val networkThreadPrefix = "kafka-network-thread-"
@@ -1272,12 +1328,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
   private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]] {
     private var _retries = 0
+    private var _acks = -1
 
     def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this }
+    def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
 
     override def build(): KafkaProducer[String, String] = {
       val producer = TestUtils.createNewProducer(bootstrapServers,
-        acks = -1,
+        acks = _acks,
         retries = _retries,
         securityProtocol = _securityProtocol,
         trustStoreFile = Some(trustStoreFile1),

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.