You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/09/01 22:59:37 UTC

[kafka] branch trunk updated: KAFKA-14195: Fix KRaft AlterConfig policy usage for Legacy/Full case (#12578)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef65b6e566 KAFKA-14195: Fix KRaft AlterConfig policy usage for Legacy/Full case (#12578)
ef65b6e566 is described below

commit ef65b6e566ef69b2f9b58038c98a5993563d7a68
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Thu Sep 1 18:59:17 2022 -0400

    KAFKA-14195: Fix KRaft AlterConfig policy usage for Legacy/Full case (#12578)
    
    #12374 adjusted the invocation of the alter configs policy check in KRaft to match the behavior in ZooKeeper, which is to only provide the configs that were explicitly sent in the request. While the code was correct for the incremental alter configs case, the code actually included the implicit deletions for the legacy/non-incremental alter configs case, and those implicit deletions are not included in the ZooKeeper-based invocation. This patch adds a test to check for this and adjust [...]
    
    Reviewers: José Armando García Sancio <js...@gmail.com>, Colin P. McCabe <cm...@apache.org>
---
 .../controller/ConfigurationControlManager.java    | 39 +++++++++++++++-------
 .../ConfigurationControlManagerTest.java           | 22 ++++++++++++
 2 files changed, 49 insertions(+), 12 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 4d6736b878..5c9a73ca98 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigResource.Type;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.KafkaConfigSchema;
@@ -238,7 +239,7 @@ public class ConfigurationControlManager {
                     setValue(newValue), CONFIG_RECORD.highestSupportedVersion()));
             }
         }
-        ApiError error = validateAlterConfig(configResource, newRecords, newlyCreatedResource);
+        ApiError error = validateAlterConfig(configResource, newRecords, Collections.emptyList(), newlyCreatedResource);
         if (error.isFailure()) {
             outputResults.put(configResource, error);
             return;
@@ -248,20 +249,27 @@ public class ConfigurationControlManager {
     }
 
     private ApiError validateAlterConfig(ConfigResource configResource,
-                                         List<ApiMessageAndVersion> newRecords,
+                                         List<ApiMessageAndVersion> recordsExplicitlyAltered,
+                                         List<ApiMessageAndVersion> recordsImplicitlyDeleted,
                                          boolean newlyCreatedResource) {
         Map<String, String> allConfigs = new HashMap<>();
-        Map<String, String> alteredConfigs = new HashMap<>();
+        Map<String, String> alteredConfigsForAlterConfigPolicyCheck = new HashMap<>();
         TimelineHashMap<String, String> existingConfigs = configData.get(configResource);
         if (existingConfigs != null) allConfigs.putAll(existingConfigs);
-        for (ApiMessageAndVersion newRecord : newRecords) {
+        for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) {
             ConfigRecord configRecord = (ConfigRecord) newRecord.message();
             if (configRecord.value() == null) {
                 allConfigs.remove(configRecord.name());
             } else {
                 allConfigs.put(configRecord.name(), configRecord.value());
             }
-            alteredConfigs.put(configRecord.name(), configRecord.value());
+            alteredConfigsForAlterConfigPolicyCheck.put(configRecord.name(), configRecord.value());
+        }
+        for (ApiMessageAndVersion recordImplicitlyDeleted : recordsImplicitlyDeleted) {
+            ConfigRecord configRecord = (ConfigRecord) recordImplicitlyDeleted.message();
+            allConfigs.remove(configRecord.name());
+            // As per KAFKA-14195, do not include implicit deletions caused by using the legacy AlterConfigs API
+            // in the list passed to the policy in order to maintain backwards compatibility
         }
         try {
             validator.validate(configResource, allConfigs);
@@ -269,12 +277,17 @@ public class ConfigurationControlManager {
                 existenceChecker.accept(configResource);
             }
             if (alterConfigPolicy.isPresent()) {
-                alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigs));
+                alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck));
             }
         } catch (ConfigException e) {
             return new ApiError(INVALID_CONFIG, e.getMessage());
         } catch (Throwable e) {
-            return ApiError.fromThrowable(e);
+            // return the corresponding API error, but emit the stack trace first if it is an unknown server error
+            ApiError apiError = ApiError.fromThrowable(e);
+            if (apiError.error() == Errors.UNKNOWN_SERVER_ERROR) {
+                log.error("Unknown server error validating Alter Configs", e);
+            }
+            return apiError;
         }
         return ApiError.NONE;
     }
@@ -310,7 +323,7 @@ public class ConfigurationControlManager {
                                            boolean newlyCreatedResource,
                                            List<ApiMessageAndVersion> outputRecords,
                                            Map<ConfigResource, ApiError> outputResults) {
-        List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+        List<ApiMessageAndVersion> recordsExplicitlyAltered = new ArrayList<>();
         Map<String, String> currentConfigs = configData.get(configResource);
         if (currentConfigs == null) {
             currentConfigs = Collections.emptyMap();
@@ -321,28 +334,30 @@ public class ConfigurationControlManager {
             String currentValue = currentConfigs.get(key);
             if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
                 // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
-                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                recordsExplicitlyAltered.add(new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(configResource.type().id()).
                     setResourceName(configResource.name()).
                     setName(key).
                     setValue(newValue), CONFIG_RECORD.highestSupportedVersion()));
             }
         }
+        List<ApiMessageAndVersion> recordsImplicitlyDeleted = new ArrayList<>();
         for (String key : currentConfigs.keySet()) {
             if (!newConfigs.containsKey(key)) {
-                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                recordsImplicitlyDeleted.add(new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(configResource.type().id()).
                     setResourceName(configResource.name()).
                     setName(key).
                     setValue(null), CONFIG_RECORD.highestSupportedVersion()));
             }
         }
-        ApiError error = validateAlterConfig(configResource, newRecords, newlyCreatedResource);
+        ApiError error = validateAlterConfig(configResource, recordsExplicitlyAltered, recordsImplicitlyDeleted, newlyCreatedResource);
         if (error.isFailure()) {
             outputResults.put(configResource, error);
             return;
         }
-        outputRecords.addAll(newRecords);
+        outputRecords.addAll(recordsExplicitlyAltered);
+        outputRecords.addAll(recordsImplicitlyDeleted);
         outputResults.put(configResource, ApiError.NONE);
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 1c59892444..02bd6e4d7e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -320,10 +320,32 @@ public class ConfigurationControlManagerTest {
                 true));
     }
 
+    private static class CheckForNullValuesPolicy implements AlterConfigPolicy {
+        @Override
+        public void validate(RequestMetadata actual) throws PolicyViolationException {
+            actual.configs().forEach((key, value) -> {
+                if (value == null) {
+                    throw new PolicyViolationException("Legacy Alter Configs should not see null values");
+                }
+            });
+        }
+
+        @Override
+        public void close() throws Exception {
+            // empty
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            // empty
+        }
+    }
+
     @Test
     public void testLegacyAlterConfigs() {
         ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
             setKafkaConfigSchema(SCHEMA).
+            setAlterConfigPolicy(Optional.of(new CheckForNullValuesPolicy())).
             build();
         List<ApiMessageAndVersion> expectedRecords1 = asList(
             new ApiMessageAndVersion(new ConfigRecord().