You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2023/05/12 18:21:12 UTC

[kafka] branch trunk updated: KAFKA-14962: Trim whitespace from ACL configuration (#13670)

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bb10ae42734 KAFKA-14962: Trim whitespace from ACL configuration (#13670)
bb10ae42734 is described below

commit bb10ae4273451468b26fad755bfa41001ac6849c
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Fri May 12 20:21:00 2023 +0200

    KAFKA-14962: Trim whitespace from ACL configuration (#13670)
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Christo Lolov <lo...@amazon.com>
---
 .../org/apache/kafka/common/config/AbstractConfig.java |  3 ++-
 .../apache/kafka/common/config/AbstractConfigTest.java |  2 ++
 .../kafka/security/authorizer/AclAuthorizer.scala      | 18 +++++++++---------
 .../kafka/security/authorizer/AuthorizerTest.scala     | 18 ++++++++++++++++++
 .../kafka/metadata/authorizer/StandardAuthorizer.java  |  4 ++--
 5 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index bbfd15c12a3..2280813db1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -194,7 +194,8 @@ public class AbstractConfig {
     }
 
     public String getString(String key) {
-        return (String) get(key);
+        final String res = (String) get(key);
+        return res == null ? res : res.trim();
     }
 
     public ConfigDef.Type typeOf(String key) {
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index c0c6f8cee37..5859dc1dc12 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -48,8 +48,10 @@ public class AbstractConfigTest {
 
     @Test
     public void testConfiguredInstances() {
+        testValidInputs("    ");
         testValidInputs("");
         testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter");
+        testValidInputs(" org.apache.kafka.common.metrics.FakeMetricsReporter ");
         testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter");
         testInvalidInputs(",");
         testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index ce3e2bd0de3..34070d16378 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -97,7 +97,7 @@ object AclAuthorizer {
 
   private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
     val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
-      map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
+      map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
     if (!zkSslClientEnable)
       new ZKClientConfig
     else {
@@ -109,9 +109,9 @@ object AclAuthorizer {
         configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
           zkClientConfig.setProperty(sysProp,
             if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
-              (prefixedValue.toString.toUpperCase == "HTTPS").toString
+              (prefixedValue.toString.trim.toUpperCase == "HTTPS").toString
             else
-              prefixedValue.toString)
+              prefixedValue.toString.trim)
         }
       }
       zkClientConfig
@@ -185,22 +185,22 @@ class AclAuthorizer extends Authorizer with Logging {
   override def configure(javaConfigs: util.Map[String, _]): Unit = {
     val configs = javaConfigs.asScala
     val props = new java.util.Properties()
-    configs.forKeyValue { (key, value) => props.put(key, value.toString) }
+    configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) }
 
     superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
       case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
     }.getOrElse(Set.empty[KafkaPrincipal])
 
-    shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+    shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.trim.toBoolean)
 
     // Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
     // means that `KafkaConfig.zkConnect` must always be set by the user (even if `AclAuthorizer.ZkUrlProp` is also
     // set).
     val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
-    val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
-    val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
-    val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
-    val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
+    val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString.trim).getOrElse(kafkaConfig.zkConnect)
+    val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
+    val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
+    val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
 
     val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(kafkaConfig, configs)
     val time = Time.SYSTEM
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index c39b785e38a..f855a58e3cb 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -326,6 +326,24 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
     }
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array(KRAFT, ZK))
+  def testAclConfigWithWhitespace(quorum: String): Unit = {
+    val props = properties
+    props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, " true")
+    // replace all property values with leading & trailing whitespaces
+    props.replaceAll((_,v) => " " + v + " ")
+    val cfg = KafkaConfig.fromProps(props)
+    var testAuthorizer: Authorizer = null
+    try {
+      testAuthorizer = createAuthorizer(cfg.originals)
+      assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
+        "when acls = null or [],  authorizer should allow op with allow.everyone = true.")
+    } finally {
+      testAuthorizer.close()
+    }
+  }
+
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array(KRAFT, ZK))
   def testAclManagementAPIs(quorum: String): Unit = {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 0725659ae4f..47f066c34b4 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -169,7 +169,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
         AuthorizationResult defaultResult = getDefaultResult(configs);
         int nodeId;
         try {
-            nodeId = Integer.parseInt(configs.get("node.id").toString());
+            nodeId = Integer.parseInt(configs.get("node.id").toString().trim());
         } catch (Exception e) {
             nodeId = -1;
         }
@@ -204,6 +204,6 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
     static AuthorizationResult getDefaultResult(Map<String, ?> configs) {
         Object configValue = configs.get(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG);
         if (configValue == null) return DENIED;
-        return Boolean.valueOf(configValue.toString()) ? ALLOWED : DENIED;
+        return Boolean.parseBoolean(configValue.toString().trim()) ? ALLOWED : DENIED;
     }
 }