You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/07/15 19:48:59 UTC

[kafka] branch trunk updated: KAFKA-14039 Fix AlterConfigPolicy usage in KRaft (#12374)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c020c94e04 KAFKA-14039 Fix AlterConfigPolicy usage in KRaft (#12374)
c020c94e04 is described below

commit c020c94e043ca4a997797da91a226735803774ab
Author: David Arthur <mu...@gmail.com>
AuthorDate: Fri Jul 15 15:48:35 2022 -0400

    KAFKA-14039 Fix AlterConfigPolicy usage in KRaft (#12374)
    
    Only pass configs from the request to the AlterConfigPolicy. This changes the KRaft usage of the AlterConfigPolicy to match the usage in ZK mode.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../AdminClientWithPoliciesIntegrationTest.scala   | 40 +++++++++++++++++++---
 .../controller/ConfigurationControlManager.java    | 14 ++++----
 .../ConfigurationControlManagerTest.java           | 26 ++++++++++----
 3 files changed, 64 insertions(+), 16 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 4d48bf5a86..5b2213a65e 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -20,17 +20,19 @@ import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig}
 import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
 import kafka.utils.{Logging, TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry}
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsOptions, Config, ConfigEntry}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.policy.AlterConfigPolicy
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 import scala.annotation.nowarn
+import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 /**
@@ -121,6 +123,14 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
     val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3)
     createTopic(topic3, 1, 1)
 
+    // Set a mutable broker config
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokers.head.config.brokerId.toString)
+    val brokerConfigs = Seq(new ConfigEntry(KafkaConfig.MessageMaxBytesProp, "50000")).asJava
+    val alterResult1 = client.alterConfigs(Map(brokerResource -> new Config(brokerConfigs)).asJava)
+    alterResult1.all.get
+    assertEquals(Set(KafkaConfig.MessageMaxBytesProp), validationsForResource(brokerResource).head.configs().keySet().asScala)
+    validations.clear()
+
     val topicConfigEntries1 = Seq(
       new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
       new ConfigEntry(LogConfig.MinInSyncReplicasProp, "2") // policy doesn't allow this
@@ -130,7 +140,6 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
 
     val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava
 
-    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokers.head.config.brokerId.toString)
     val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava
 
     // Alter configs: second is valid, the others are invalid
@@ -146,6 +155,9 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
     alterResult.values.get(topicResource2).get
     assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
     assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
+    assertTrue(validationsForResource(brokerResource).isEmpty,
+      "Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.")
+    validations.clear()
 
     // Verify that the second resource was updated and the others were not
     ensureConsistentKRaftMetadata()
@@ -175,6 +187,9 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
     alterResult.values.get(topicResource2).get
     assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
     assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
+    assertTrue(validationsForResource(brokerResource).isEmpty,
+      "Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.")
+    validations.clear()
 
     // Verify that no resources are updated since validate_only = true
     ensureConsistentKRaftMetadata()
@@ -188,27 +203,44 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
     assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
 
     assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
+
+    // Do an incremental alter config on the broker, ensure we don't see the broker config we set earlier in the policy
+    alterResult = client.incrementalAlterConfigs(Map(
+      brokerResource ->
+        Seq(new AlterConfigOp(
+          new ConfigEntry(KafkaConfig.MaxConnectionsProp, "9999"), OpType.SET)
+        ).asJavaCollection
+    ).asJava)
+    alterResult.all.get
+    assertEquals(Set(KafkaConfig.MaxConnectionsProp), validationsForResource(brokerResource).head.configs().keySet().asScala)
   }
 
 }
 
 object AdminClientWithPoliciesIntegrationTest {
 
+  val validations = new mutable.ListBuffer[AlterConfigPolicy.RequestMetadata]()
+
+  def validationsForResource(resource: ConfigResource): Seq[AlterConfigPolicy.RequestMetadata] = {
+    validations.filter { req => req.resource().equals(resource) }.toSeq
+  }
+
   class Policy extends AlterConfigPolicy {
 
     var configs: Map[String, _] = _
     var closed = false
 
     def configure(configs: util.Map[String, _]): Unit = {
+      validations.clear()
       this.configs = configs.asScala.toMap
     }
 
     def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = {
+      validations.append(requestMetadata)
       require(!closed, "Policy should not be closed")
       require(!configs.isEmpty, "configure should have been called with non empty configs")
       require(!requestMetadata.configs.isEmpty, "request configs should not be empty")
       require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty")
-      require(requestMetadata.resource.name.contains("topic"))
       if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
         throw new PolicyViolationException("Min in sync replicas cannot be updated")
     }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index cde9d39569..4b8561a4d9 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -248,24 +248,26 @@ public class ConfigurationControlManager {
     private ApiError validateAlterConfig(ConfigResource configResource,
                                          List<ApiMessageAndVersion> newRecords,
                                          boolean newlyCreatedResource) {
-        Map<String, String> newConfigs = new HashMap<>();
+        Map<String, String> allConfigs = new HashMap<>();
+        Map<String, String> alteredConfigs = new HashMap<>();
         TimelineHashMap<String, String> existingConfigs = configData.get(configResource);
-        if (existingConfigs != null) newConfigs.putAll(existingConfigs);
+        if (existingConfigs != null) allConfigs.putAll(existingConfigs);
         for (ApiMessageAndVersion newRecord : newRecords) {
             ConfigRecord configRecord = (ConfigRecord) newRecord.message();
             if (configRecord.value() == null) {
-                newConfigs.remove(configRecord.name());
+                allConfigs.remove(configRecord.name());
             } else {
-                newConfigs.put(configRecord.name(), configRecord.value());
+                allConfigs.put(configRecord.name(), configRecord.value());
             }
+            alteredConfigs.put(configRecord.name(), configRecord.value());
         }
         try {
-            validator.validate(configResource, newConfigs);
+            validator.validate(configResource, allConfigs);
             if (!newlyCreatedResource) {
                 existenceChecker.accept(configResource);
             }
             if (alterConfigPolicy.isPresent()) {
-                alterConfigPolicy.get().validate(new RequestMetadata(configResource, newConfigs));
+                alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigs));
             }
         } catch (ConfigException e) {
             return new ApiError(INVALID_CONFIG, e.getMessage());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 007e84ffc0..1c59892444 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -281,18 +281,30 @@ public class ConfigurationControlManagerTest {
     public void testIncrementalAlterConfigsWithPolicy() {
         MockAlterConfigsPolicy policy = new MockAlterConfigsPolicy(asList(
             new RequestMetadata(MYTOPIC, Collections.emptyMap()),
-            new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"),
-                entry("quux", "456")))));
+            new RequestMetadata(BROKER0, toMap(
+                entry("foo.bar", "123"),
+                entry("quux", "456"),
+                entry("broker.config.to.remove", null)))));
         ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
             setKafkaConfigSchema(SCHEMA).
             setAlterConfigPolicy(Optional.of(policy)).
             build();
+        // Existing configs should not be passed to the policy
+        manager.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
+                setName("broker.config").setValue("123"));
+        manager.replay(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName(MYTOPIC.name()).
+                setName("topic.config").setValue("123"));
+        manager.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
+                setName("broker.config.to.remove").setValue("123"));
         assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
                 new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
                     setName("foo.bar").setValue("123"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
                 new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
-                    setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion())),
-            toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION,
+                    setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
+                new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
+                    setName("broker.config.to.remove").setValue(null), CONFIG_RECORD.highestSupportedVersion())
+                ),
+                toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION,
                     "Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" +
                     "type=TOPIC, name='mytopic'), configs={}). Got: " +
                     "AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" +
@@ -301,8 +313,10 @@ public class ConfigurationControlManagerTest {
             manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
                 entry("foo.bar", entry(SET, "123")))),
                 entry(BROKER0, toMap(
-                entry("foo.bar", entry(SET, "123")),
-                entry("quux", entry(SET, "456"))))),
+                        entry("foo.bar", entry(SET, "123")),
+                        entry("quux", entry(SET, "456")),
+                        entry("broker.config.to.remove", entry(DELETE, null))
+                ))),
                 true));
     }