You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/08/04 19:19:00 UTC

[kafka] branch 3.3 updated: KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 3fa3272036 KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)
3fa3272036 is described below

commit 3fa327203658e458f23aa9f7f7ca3e7d3d9b3e43
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Aug 4 15:09:08 2022 -0400

    KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)
---
 .../server/DynamicBrokerReconfigurationTest.scala  | 54 ++++++++++++++--------
 .../controller/ConfigurationControlManager.java    | 13 ++++--
 2 files changed, 44 insertions(+), 23 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c7be8ce831..bd6308c0b8 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.concurrent.atomic.AtomicInteger
 import scala.annotation.nowarn
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
@@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     }
   }
 
-  @Test // TODO KAFKA-14126 add KRaft support
-  def testKeyStoreAlter(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testKeyStoreAlter(quorum: String): Unit = {
     val topic2 = "testtopic2"
     TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
 
@@ -419,8 +421,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
-  @Test // TODO KAFKA-14126 add KRaft support
-  def testTrustStoreAlter(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTrustStoreAlter(quorum: String): Unit = {
     val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
 
     // Producer with new keystore should fail to connect before truststore update
@@ -467,9 +470,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
     }
 
+    val group_id = new AtomicInteger(1)
+    def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}"
+
     // Produce/consume should work with old as well as new client keystore
-    verifySslProduceConsume(sslProperties1, "alter-truststore-1")
-    verifySslProduceConsume(sslProperties2, "alter-truststore-2")
+    verifySslProduceConsume(sslProperties1, next_group_name())
+    verifySslProduceConsume(sslProperties2, next_group_name())
 
     // Revert to old truststore with only one certificate and update. Clients should connect only with old keystore.
     val oldTruststoreProps = new Properties
@@ -478,7 +484,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     reconfigureServers(oldTruststoreProps, perBrokerConfig = true,
       (s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
     verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
-    verifySslProduceConsume(sslProperties1, "alter-truststore-3")
+    verifySslProduceConsume(sslProperties1, next_group_name())
 
     // Update same truststore file to contain both certificates without changing any configs.
     // Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes.
@@ -486,8 +492,14 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
       StandardCopyOption.REPLACE_EXISTING)
     TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
-    verifySslProduceConsume(sslProperties1, "alter-truststore-4")
-    verifySslProduceConsume(sslProperties2, "alter-truststore-5")
+    TestUtils.retry(30000) {
+      try {
+        verifySslProduceConsume(sslProperties1, next_group_name())
+        verifySslProduceConsume(sslProperties2, next_group_name())
+      } catch {
+        case t: Throwable => throw new AssertionError(t)
+      }
+    }
 
     // Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
     // Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
@@ -495,21 +507,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
     props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
     TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
-    verifySslProduceConsume(sslProperties2, "alter-truststore-6")
+    verifySslProduceConsume(sslProperties2, next_group_name())
     props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
     TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
-    verifySslProduceConsume(sslProperties2, "alter-truststore-7")
+    verifySslProduceConsume(sslProperties2, next_group_name())
     waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
 
-    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
-    val controllerChannelManager = controller.kafkaController.controllerChannelManager
-    val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
-      JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
-    brokerStateInfo(0).networkClient.disconnect("0")
-    TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
-
-    // validate that the brokerToController request works fine
-    verifyBrokerToControllerCall(controller)
+    if (!isKRaftTest()) {
+      val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
+      val controllerChannelManager = controller.kafkaController.controllerChannelManager
+      val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
+        JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
+      brokerStateInfo(0).networkClient.disconnect("0")
+      TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
+
+      // validate that the brokerToController request works fine
+      verifyBrokerToControllerCall(controller)
+    }
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 4b8561a4d9..4d6736b878 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource.Type;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
@@ -228,7 +229,8 @@ public class ConfigurationControlManager {
                     newValue = String.join(",", oldValueList);
                     break;
             }
-            if (!Objects.equals(currentValue, newValue)) {
+            if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
+                // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
                 newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(configResource.type().id()).
                     setResourceName(configResource.name()).
@@ -317,7 +319,8 @@ public class ConfigurationControlManager {
             String key = entry.getKey();
             String newValue = entry.getValue();
             String currentValue = currentConfigs.get(key);
-            if (!Objects.equals(newValue, currentValue)) {
+            if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
+                // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
                 newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(configResource.type().id()).
                     setResourceName(configResource.name()).
@@ -381,7 +384,11 @@ public class ConfigurationControlManager {
         if (configs.isEmpty()) {
             configData.remove(configResource);
         }
-        log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
+        if (configSchema.isSensitive(record)) {
+            log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN);
+        } else {
+            log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
+        }
     }
 
     // VisibleForTesting