You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/25 19:44:31 UTC

[kafka] 01/02: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft (#12063)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 44e419722e6f2baa4366eeb5eacc55b91fa9b0d1
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Tue Apr 19 13:17:16 2022 -0700

    KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft (#12063)
    
    Fix two bugs related to dynamic broker configs in KRaft. The first bug is that we are calling reloadUpdatedFilesWithoutConfigChange when a topic configuration is changed, but not when a
    broker configuration is changed. This is backwards. This function must be called only for broker
    configs, and never for topic configs or cluster configs.
    
    The second bug is that there were several configurations such as max.connections which are related
    to broker listeners, but which do not involve changing the registered listeners. We should support
    these configurations in KRaft. This PR fixes the configuration change validation to support this case.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Matthew de Detrich <md...@gmail.com>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 64 +++++++++++++++------
 .../server/metadata/BrokerMetadataListener.scala   | 18 ++++++
 .../server/metadata/BrokerMetadataPublisher.scala  | 41 ++++++++-----
 .../metadata/BrokerMetadataPublisherTest.scala     | 67 ++++++++++++++++++++++
 4 files changed, 159 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index a40444507b8..918e936724f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -820,8 +820,12 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
     configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
   }
 }
-object DynamicListenerConfig {
 
+object DynamicListenerConfig {
+  /**
+   * The set of configurations which the DynamicListenerConfig object listens for. Many of
+   * these are also monitored by other objects such as ChannelBuilders and SocketServers.
+   */
   val ReconfigurableConfigs = Set(
     // Listener configs
     KafkaConfig.AdvertisedListenersProp,
@@ -909,11 +913,32 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
     DynamicListenerConfig.ReconfigurableConfigs
   }
 
+  private def listenerRegistrationsAltered(
+    oldAdvertisedListeners: Map[ListenerName, EndPoint],
+    newAdvertisedListeners: Map[ListenerName, EndPoint]
+  ): Boolean = {
+    if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
+    oldAdvertisedListeners.forKeyValue {
+      case (oldListenerName, oldEndpoint) =>
+        newAdvertisedListeners.get(oldListenerName) match {
+          case None => return true
+          case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) {
+            return true
+          }
+        }
+    }
+    false
+  }
+
+  private def verifyListenerRegistrationAlterationSupported(): Unit = {
+    if (!server.config.requiresZookeeper) {
+      throw new ConfigException("Advertised listeners cannot be altered when using a " +
+        "Raft-based metadata quorum.")
+    }
+  }
+
   def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     val oldConfig = server.config
-    if (!oldConfig.requiresZookeeper) {
-      throw new ConfigException("Dynamic reconfiguration of listeners is not yet supported when using a Raft-based metadata quorum")
-    }
     val newListeners = listenersToMap(newConfig.listeners)
     val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedListeners)
     val oldListeners = listenersToMap(oldConfig.listeners)
@@ -936,6 +961,13 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
     }
     if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
       throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}")
+
+    // Currently, we do not support adding or removing listeners when in KRaft mode.
+    // However, we support changing other listener configurations (max connections, etc.)
+    if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
+        listenersToMap(newConfig.effectiveAdvertisedListeners))) {
+      verifyListenerRegistrationAlterationSupported()
+    }
   }
 
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
@@ -945,18 +977,18 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
     val oldListenerMap = listenersToMap(oldListeners)
     val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(e.listenerName))
     val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(e.listenerName))
-
-    // Clear SASL login cache to force re-login
-    if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty)
-      LoginManager.closeAll()
-
-    server.socketServer.removeListeners(listenersRemoved)
-    if (listenersAdded.nonEmpty)
-      server.socketServer.addListeners(listenersAdded)
-
-    server match {
-      case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
-      case _ =>
+    if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) {
+      LoginManager.closeAll() // Clear SASL login cache to force re-login
+      if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
+      if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
+    }
+    if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
+        listenersToMap(newConfig.effectiveAdvertisedListeners))) {
+      verifyListenerRegistrationAlterationSupported()
+      server match {
+        case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
+        case _ => throw new RuntimeException("Unable to handle non-kafkaServer")
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 5b118220071..5b71409714d 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -248,6 +248,24 @@ class BrokerMetadataListener(
     }
   }
 
+  // This is used in tests to alter the publisher that is in use by the broker.
+  def alterPublisher(publisher: MetadataPublisher): CompletableFuture[Void] = {
+    val event = new AlterPublisherEvent(publisher)
+    eventQueue.append(event)
+    event.future
+  }
+
+  class AlterPublisherEvent(publisher: MetadataPublisher)
+    extends EventQueue.FailureLoggingEvent(log) {
+    val future = new CompletableFuture[Void]()
+
+    override def run(): Unit = {
+      _publisher = Some(publisher)
+      log.info(s"Set publisher to ${publisher}")
+      future.complete(null)
+    }
+  }
+
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
     _image = _delta.apply()
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 74c5348afc7..291a1507d28 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -17,6 +17,8 @@
 
 package kafka.server.metadata
 
+import java.util.Properties
+
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{LogManager, UnifiedLog}
@@ -187,21 +189,26 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                 toLoggableProps(resource, props).mkString(","))
               dynamicConfigHandlers(ConfigType.Topic).
                 processConfigChanges(resource.name(), props)
-              conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
-            case BROKER => if (resource.name().isEmpty) {
-              // Apply changes to "cluster configs" (also known as default BROKER configs).
-              // These are stored in KRaft with an empty name field.
-              info(s"Updating cluster configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Broker).
-                processConfigChanges(ConfigEntityName.Default, props)
-            } else if (resource.name().equals(brokerId.toString)) {
-              // Apply changes to this broker's dynamic configuration.
-              info(s"Updating broker ${brokerId} with new configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Broker).
-                processConfigChanges(resource.name(), props)
-            }
+            case BROKER =>
+              if (resource.name().isEmpty) {
+                // Apply changes to "cluster configs" (also known as default BROKER configs).
+                // These are stored in KRaft with an empty name field.
+                info("Updating cluster configuration : " +
+                  toLoggableProps(resource, props).mkString(","))
+                dynamicConfigHandlers(ConfigType.Broker).
+                  processConfigChanges(ConfigEntityName.Default, props)
+              } else if (resource.name() == brokerId.toString) {
+                // Apply changes to this broker's dynamic configuration.
+                info(s"Updating broker ${brokerId} with new configuration : " +
+                  toLoggableProps(resource, props).mkString(","))
+                dynamicConfigHandlers(ConfigType.Broker).
+                  processConfigChanges(resource.name(), props)
+                // When applying a per broker config (not a cluster config), we also
+                // reload any associated file. For example, if the ssl.keystore is still
+                // set to /tmp/foo, we still want to reload /tmp/foo in case its contents
+                // have changed. This doesn't apply to topic configs or cluster configs.
+                reloadUpdatedFilesWithoutConfigChange(props)
+              }
             case _ => // nothing to do
           }
         }
@@ -250,6 +257,10 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
     }
   }
 
+  def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
+    conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
+  }
+
   /**
    * Update the coordinator of local replica changes: election and resignation.
    *
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 9482ae27be3..329c9d1e1ea 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,15 +17,30 @@
 
 package unit.kafka.server.metadata
 
+import java.util.Collections.{singleton, singletonMap}
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
 import kafka.log.UnifiedLog
+import kafka.server.KafkaConfig
 import kafka.server.metadata.BrokerMetadataPublisher
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.BROKER
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
 import scala.jdk.CollectionConverters._
 
 class BrokerMetadataPublisherTest {
@@ -142,4 +157,56 @@ class BrokerMetadataPublisherTest {
     new TopicsImage(idsMap.asJava, namesMap.asJava)
   }
 
+  @Test
+  def testReloadUpdatedFilesWithoutConfigChange(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val broker = cluster.brokers().values().iterator().next()
+      val publisher = Mockito.spy(new BrokerMetadataPublisher(
+        conf = broker.config,
+        metadataCache = broker.metadataCache,
+        logManager = broker.logManager,
+        replicaManager = broker.replicaManager,
+        groupCoordinator = broker.groupCoordinator,
+        txnCoordinator = broker.transactionCoordinator,
+        clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
+        featureCache = broker.featureCache,
+        dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
+        _authorizer = Option.empty
+      ))
+      val numTimesReloadCalled = new AtomicInteger(0)
+      Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())).
+        thenAnswer(new Answer[Unit]() {
+          override def answer(invocation: InvocationOnMock): Unit = numTimesReloadCalled.addAndGet(1)
+        })
+      broker.metadataListener.alterPublisher(publisher).get()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        assertEquals(0, numTimesReloadCalled.get())
+        admin.incrementalAlterConfigs(singletonMap(
+          new ConfigResource(BROKER, ""),
+          singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.MaxConnectionsProp, "123"), SET)))).all().get()
+        TestUtils.waitUntilTrue(() => numTimesReloadCalled.get() == 0,
+          "numTimesConfigured never reached desired value")
+
+        // Setting the foo.bar.test.configuration to 1 will still trigger reconfiguration because
+        // reloadUpdatedFilesWithoutConfigChange will be called.
+        admin.incrementalAlterConfigs(singletonMap(
+          new ConfigResource(BROKER, broker.config.nodeId.toString),
+          singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.MaxConnectionsProp, "123"), SET)))).all().get()
+        TestUtils.waitUntilTrue(() => numTimesReloadCalled.get() == 1,
+          "numTimesConfigured never reached desired value")
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
 }