You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/08/04 19:09:15 UTC
[kafka] branch trunk updated: KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new add7cd85ba KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)
add7cd85ba is described below
commit add7cd85baa61cd0e1430a0299dc330321126f31
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Aug 4 15:09:08 2022 -0400
KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)
---
.../server/DynamicBrokerReconfigurationTest.scala | 54 ++++++++++++++--------
.../controller/ConfigurationControlManager.java | 13 ++++--
2 files changed, 44 insertions(+), 23 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index aeeabcc008..295ad06121 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
@@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}
- @Test // TODO KAFKA-14126 add KRaft support
- def testKeyStoreAlter(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testKeyStoreAlter(quorum: String): Unit = {
val topic2 = "testtopic2"
TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
@@ -421,8 +423,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
- @Test // TODO KAFKA-14126 add KRaft support
- def testTrustStoreAlter(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTrustStoreAlter(quorum: String): Unit = {
val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
// Producer with new keystore should fail to connect before truststore update
@@ -469,9 +472,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
}
+ val group_id = new AtomicInteger(1)
+ def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}"
+
// Produce/consume should work with old as well as new client keystore
- verifySslProduceConsume(sslProperties1, "alter-truststore-1")
- verifySslProduceConsume(sslProperties2, "alter-truststore-2")
+ verifySslProduceConsume(sslProperties1, next_group_name())
+ verifySslProduceConsume(sslProperties2, next_group_name())
// Revert to old truststore with only one certificate and update. Clients should connect only with old keystore.
val oldTruststoreProps = new Properties
@@ -480,7 +486,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
reconfigureServers(oldTruststoreProps, perBrokerConfig = true,
(s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
- verifySslProduceConsume(sslProperties1, "alter-truststore-3")
+ verifySslProduceConsume(sslProperties1, next_group_name())
// Update same truststore file to contain both certificates without changing any configs.
// Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes.
@@ -488,8 +494,14 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
StandardCopyOption.REPLACE_EXISTING)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
- verifySslProduceConsume(sslProperties1, "alter-truststore-4")
- verifySslProduceConsume(sslProperties2, "alter-truststore-5")
+ TestUtils.retry(30000) {
+ try {
+ verifySslProduceConsume(sslProperties1, next_group_name())
+ verifySslProduceConsume(sslProperties2, next_group_name())
+ } catch {
+ case t: Throwable => throw new AssertionError(t)
+ }
+ }
// Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
// Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
@@ -497,21 +509,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
- verifySslProduceConsume(sslProperties2, "alter-truststore-6")
+ verifySslProduceConsume(sslProperties2, next_group_name())
props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
- verifySslProduceConsume(sslProperties2, "alter-truststore-7")
+ verifySslProduceConsume(sslProperties2, next_group_name())
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
- val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
- val controllerChannelManager = controller.kafkaController.controllerChannelManager
- val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
- JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
- brokerStateInfo(0).networkClient.disconnect("0")
- TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
-
- // validate that the brokerToController request works fine
- verifyBrokerToControllerCall(controller)
+ if (!isKRaftTest()) {
+ val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
+ val controllerChannelManager = controller.kafkaController.controllerChannelManager
+ val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
+ JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
+ brokerStateInfo(0).networkClient.disconnect("0")
+ TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
+
+ // validate that the brokerToController request works fine
+ verifyBrokerToControllerCall(controller)
+ }
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 4b8561a4d9..4d6736b878 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
@@ -228,7 +229,8 @@ public class ConfigurationControlManager {
newValue = String.join(",", oldValueList);
break;
}
- if (!Objects.equals(currentValue, newValue)) {
+ if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
+ // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
@@ -317,7 +319,8 @@ public class ConfigurationControlManager {
String key = entry.getKey();
String newValue = entry.getValue();
String currentValue = currentConfigs.get(key);
- if (!Objects.equals(newValue, currentValue)) {
+ if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
+ // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
@@ -381,7 +384,11 @@ public class ConfigurationControlManager {
if (configs.isEmpty()) {
configData.remove(configResource);
}
- log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
+ if (configSchema.isSensitive(record)) {
+ log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN);
+ } else {
+ log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
+ }
}
// VisibleForTesting