You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/23 16:47:21 UTC

[kafka] branch trunk updated: KAFKA-13923; Generalize authorizer system test for kraft (#12190)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b5699b5ccd KAFKA-13923; Generalize authorizer system test for kraft (#12190)
b5699b5ccd is described below

commit b5699b5ccd68ae0b27530a6cfcf14a5f3099eef0
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon May 23 09:47:14 2022 -0700

    KAFKA-13923; Generalize authorizer system test for kraft (#12190)
    
    Change `ZookeeperAuthorizerTest` to `AuthorizerTest` and add support for KRaft's `StandardAuthorizer` implementation.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 tests/kafkatest/services/kafka/kafka.py            |  3 +-
 ...eeper_authorizer_test.py => authorizer_test.py} | 41 +++++++++++++---------
 2 files changed, 26 insertions(+), 18 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 55b5b7b871..bb7297475e 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -158,7 +158,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     METADATA_SNAPSHOT_SEARCH_STR = "%s/__cluster_metadata-0/*.checkpoint" % METADATA_LOG_DIR
     METADATA_FIRST_LOG = "%s/__cluster_metadata-0/00000000000000000000.log" % METADATA_LOG_DIR
     # Kafka Authorizer
-    ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
+    ZK_ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
+    KRAFT_ACL_AUTHORIZER = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
     INTERBROKER_LISTENER_NAME = 'INTERNAL'
     JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
diff --git a/tests/kafkatest/tests/core/zookeeper_authorizer_test.py b/tests/kafkatest/tests/core/authorizer_test.py
similarity index 80%
rename from tests/kafkatest/tests/core/zookeeper_authorizer_test.py
rename to tests/kafkatest/tests/core/authorizer_test.py
index 97c20c0c41..20994c5b0d 100644
--- a/tests/kafkatest/tests/core/zookeeper_authorizer_test.py
+++ b/tests/kafkatest/tests/core/authorizer_test.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
-from ducktape.mark import matrix
+from ducktape.mark import parametrize
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 
@@ -22,8 +22,8 @@ from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.security.kafka_acls import ACLs
 
-class ZooKeeperAuthorizerTest(Test):
-    """Tests that the ZooKeeper-based Authorizer works wth both ZooKeeper-based and KRaft clusters.
+class AuthorizerTest(Test):
+    """Tests that the default Authorizer implementations work with both ZooKeeper-based and KRaft clusters.
     Alters client quotas, making sure it works.
     Rolls Kafka with an authorizer.
     Alters client quotas, making sure it fails.
@@ -36,22 +36,29 @@ class ZooKeeperAuthorizerTest(Test):
     """
 
     def __init__(self, test_context):
-        super(ZooKeeperAuthorizerTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        # setup ZooKeeper even with KRaft
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
-                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}},
-                                  controller_num_nodes_override=1, allow_zk_with_kraft=True)
+        super(AuthorizerTest, self).__init__(test_context=test_context)
+        self.test_context = test_context
+
     def setUp(self):
-        # start ZooKeeper even with KRaft
-        self.zk.start()
         self.acls = ACLs(self.test_context)
 
     @cluster(num_nodes=4)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_authorizer(self, metadata_quorum):
+    @parametrize(metadata_quorum=quorum.remote_kraft, authorizer_class=KafkaService.KRAFT_ACL_AUTHORIZER)
+    @parametrize(metadata_quorum=quorum.remote_kraft, authorizer_class=KafkaService.ZK_ACL_AUTHORIZER)
+    @parametrize(metadata_quorum=quorum.zk, authorizer_class=KafkaService.ZK_ACL_AUTHORIZER)
+    def test_authorizer(self, metadata_quorum, authorizer_class):
+        topics = {"test_topic": {"partitions": 1, "replication-factor": 1}}
+
+        if (authorizer_class == KafkaService.KRAFT_ACL_AUTHORIZER):
+            self.zk = None
+        else:
+            self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            self.zk.start()
+
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+                                  topics=topics, controller_num_nodes_override=1,
+                                  allow_zk_with_kraft=True)
+
         broker_security_protocol = "SSL"
         broker_principal = "User:CN=systemtest"
         client_security_protocol = "SASL_PLAINTEXT"
@@ -80,11 +87,11 @@ class ZooKeeperAuthorizerTest(Test):
             # we need to explicitly reconfigure/restart any remote controller quorum
             self.kafka.logger.info("Restarting Remote KRaft Controller with authorizer and broker principal as super user")
             controller_quorum = self.kafka.controller_quorum
-            controller_quorum.authorizer_class_name = KafkaService.ACL_AUTHORIZER
+            controller_quorum.authorizer_class_name = authorizer_class
             controller_quorum.server_prop_overrides = [["super.users", broker_principal]] # for broker to work with an authorizer
             controller_quorum.restart_cluster()
         self.kafka.logger.info("Restarting Kafka with authorizer and broker principal as super user")
-        self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
+        self.kafka.authorizer_class_name = authorizer_class
         self.kafka.server_prop_overrides = [["super.users", broker_principal]] # for broker to work with an authorizer
         self.kafka.restart_cluster()