You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2023/05/12 18:21:12 UTC
[kafka] branch trunk updated: KAFKA-14962: Trim whitespace from ACL configuration (#13670)
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bb10ae42734 KAFKA-14962: Trim whitespace from ACL configuration (#13670)
bb10ae42734 is described below
commit bb10ae4273451468b26fad755bfa41001ac6849c
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Fri May 12 20:21:00 2023 +0200
KAFKA-14962: Trim whitespace from ACL configuration (#13670)
Reviewers: Manikumar Reddy <ma...@gmail.com>, Christo Lolov <lo...@amazon.com>
---
.../org/apache/kafka/common/config/AbstractConfig.java | 3 ++-
.../apache/kafka/common/config/AbstractConfigTest.java | 2 ++
.../kafka/security/authorizer/AclAuthorizer.scala | 18 +++++++++---------
.../kafka/security/authorizer/AuthorizerTest.scala | 18 ++++++++++++++++++
.../kafka/metadata/authorizer/StandardAuthorizer.java | 4 ++--
5 files changed, 33 insertions(+), 12 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index bbfd15c12a3..2280813db1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -194,7 +194,8 @@ public class AbstractConfig {
}
public String getString(String key) {
- return (String) get(key);
+ final String res = (String) get(key);
+ return res == null ? res : res.trim();
}
public ConfigDef.Type typeOf(String key) {
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index c0c6f8cee37..5859dc1dc12 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -48,8 +48,10 @@ public class AbstractConfigTest {
@Test
public void testConfiguredInstances() {
+ testValidInputs(" ");
testValidInputs("");
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter");
+ testValidInputs(" org.apache.kafka.common.metrics.FakeMetricsReporter ");
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter");
testInvalidInputs(",");
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index ce3e2bd0de3..34070d16378 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -97,7 +97,7 @@ object AclAuthorizer {
private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
- map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
+ map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
if (!zkSslClientEnable)
new ZKClientConfig
else {
@@ -109,9 +109,9 @@ object AclAuthorizer {
configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
zkClientConfig.setProperty(sysProp,
if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
- (prefixedValue.toString.toUpperCase == "HTTPS").toString
+ (prefixedValue.toString.trim.toUpperCase == "HTTPS").toString
else
- prefixedValue.toString)
+ prefixedValue.toString.trim)
}
}
zkClientConfig
@@ -185,22 +185,22 @@ class AclAuthorizer extends Authorizer with Logging {
override def configure(javaConfigs: util.Map[String, _]): Unit = {
val configs = javaConfigs.asScala
val props = new java.util.Properties()
- configs.forKeyValue { (key, value) => props.put(key, value.toString) }
+ configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) }
superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
}.getOrElse(Set.empty[KafkaPrincipal])
- shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+ shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.trim.toBoolean)
// Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
// means that `KafkaConfig.zkConnect` must always be set by the user (even if `AclAuthorizer.ZkUrlProp` is also
// set).
val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
- val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
- val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
- val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
- val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
+ val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString.trim).getOrElse(kafkaConfig.zkConnect)
+ val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
+ val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
+ val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(kafkaConfig, configs)
val time = Time.SYSTEM
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index c39b785e38a..f855a58e3cb 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -326,6 +326,24 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
}
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array(KRAFT, ZK))
+ def testAclConfigWithWhitespace(quorum: String): Unit = {
+ val props = properties
+ props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, " true")
+ // replace all property values with leading & trailing whitespaces
+ props.replaceAll((_,v) => " " + v + " ")
+ val cfg = KafkaConfig.fromProps(props)
+ var testAuthorizer: Authorizer = null
+ try {
+ testAuthorizer = createAuthorizer(cfg.originals)
+ assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
+ "when acls = null or [], authorizer should allow op with allow.everyone = true.")
+ } finally {
+ testAuthorizer.close()
+ }
+ }
+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array(KRAFT, ZK))
def testAclManagementAPIs(quorum: String): Unit = {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 0725659ae4f..47f066c34b4 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -169,7 +169,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
AuthorizationResult defaultResult = getDefaultResult(configs);
int nodeId;
try {
- nodeId = Integer.parseInt(configs.get("node.id").toString());
+ nodeId = Integer.parseInt(configs.get("node.id").toString().trim());
} catch (Exception e) {
nodeId = -1;
}
@@ -204,6 +204,6 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
static AuthorizationResult getDefaultResult(Map<String, ?> configs) {
Object configValue = configs.get(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG);
if (configValue == null) return DENIED;
- return Boolean.valueOf(configValue.toString()) ? ALLOWED : DENIED;
+ return Boolean.parseBoolean(configValue.toString().trim()) ? ALLOWED : DENIED;
}
}