You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/14 16:51:01 UTC

[GitHub] [kafka] soarez commented on a diff in pull request #12843: Remove use of "authorizer-properties" from EndToEndAuthorizerTest

soarez commented on code in PR #12843:
URL: https://github.com/apache/kafka/pull/12843#discussion_r1020949539


##########
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##########
@@ -96,96 +90,53 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   def clientPrincipal: KafkaPrincipal
   def kafkaPrincipal: KafkaPrincipal
 
-  // Arguments to AclCommand to set ACLs.
-  def clusterActionArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--cluster",
-                                          s"--operation=ClusterAction",
-                                          s"--allow-principal=$kafkaPrincipal")
-  // necessary to create SCRAM credentials via the admin client using the broker's credentials
-  // without this we would need to create the SCRAM credentials via ZooKeeper
-  def clusterAlterArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--cluster",
-                                          s"--operation=Alter",
-                                          s"--allow-principal=$kafkaPrincipal")
-  def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$wildcard",
-                                          s"--operation=Read",
-                                          s"--allow-principal=$kafkaPrincipal")
-  def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--producer",
-                                          s"--allow-principal=$clientPrincipal")
-  def describeAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--operation=Describe",
-                                          s"--allow-principal=$clientPrincipal")
-  def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--remove",
-                                          s"--force",
-                                          s"--topic=$topic",
-                                          s"--operation=Describe",
-                                          s"--allow-principal=$clientPrincipal")
-  def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--remove",
-                                          s"--force",
-                                          s"--topic=$topic",
-                                          s"--operation=Write",
-                                          s"--allow-principal=$clientPrincipal")
-  def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--group=$group",
-                                          s"--consumer",
-                                          s"--allow-principal=$clientPrincipal")
-  def groupAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--group=$group",
-                                          s"--operation=Read",
-                                          s"--allow-principal=$clientPrincipal")
-  def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$wildcard",
-                                          s"--group=$wildcard",
-                                          s"--consumer",
-                                          s"--producer",
-                                          s"--allow-principal=$clientPrincipal")
-  def produceConsumePrefixedAclsArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topicPrefix",
-                                          s"--group=$groupPrefix",
-                                          s"--resource-pattern-type=prefixed",
-                                          s"--consumer",
-                                          s"--producer",
-                                          s"--allow-principal=$clientPrincipal")
-
   def ClusterActionAndClusterAlterAcls = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW),
     new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, ALTER, ALLOW))
-  def TopicBrokerReadAcl = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, READ, ALLOW))
   def GroupReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW))
   def TopicReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW))
   def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, WRITE, ALLOW))
   def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, DESCRIBE, ALLOW))
   def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, CREATE, ALLOW))
-  // The next two configuration parameters enable ZooKeeper secure ACLs
-  // and sets the Kafka authorizer, both necessary to enable security.
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
-  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName)
+
+  def AclClusterAction = new AclBinding(clusterResource,
+    new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW))
+  def AclAlter = new AclBinding(clusterResource,
+    new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.ALTER, AclPermissionType.ALLOW))
+
+  def AclTopicWrite(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW))
+  def AclTopicCreate(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW))
+  def AclTopicDescribe(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+  def AclTopicRead(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource,

Review Comment:
   s/tptopicresource/topicResource/g



##########
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##########
@@ -96,96 +90,53 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   def clientPrincipal: KafkaPrincipal
   def kafkaPrincipal: KafkaPrincipal
 
-  // Arguments to AclCommand to set ACLs.
-  def clusterActionArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--cluster",
-                                          s"--operation=ClusterAction",
-                                          s"--allow-principal=$kafkaPrincipal")
-  // necessary to create SCRAM credentials via the admin client using the broker's credentials
-  // without this we would need to create the SCRAM credentials via ZooKeeper
-  def clusterAlterArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--cluster",
-                                          s"--operation=Alter",
-                                          s"--allow-principal=$kafkaPrincipal")
-  def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$wildcard",
-                                          s"--operation=Read",
-                                          s"--allow-principal=$kafkaPrincipal")
-  def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--producer",
-                                          s"--allow-principal=$clientPrincipal")
-  def describeAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--operation=Describe",
-                                          s"--allow-principal=$clientPrincipal")
-  def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--remove",
-                                          s"--force",
-                                          s"--topic=$topic",
-                                          s"--operation=Describe",
-                                          s"--allow-principal=$clientPrincipal")
-  def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--remove",
-                                          s"--force",
-                                          s"--topic=$topic",
-                                          s"--operation=Write",
-                                          s"--allow-principal=$clientPrincipal")
-  def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--group=$group",
-                                          s"--consumer",
-                                          s"--allow-principal=$clientPrincipal")
-  def groupAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--group=$group",
-                                          s"--operation=Read",
-                                          s"--allow-principal=$clientPrincipal")
-  def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$wildcard",
-                                          s"--group=$wildcard",
-                                          s"--consumer",
-                                          s"--producer",
-                                          s"--allow-principal=$clientPrincipal")
-  def produceConsumePrefixedAclsArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topicPrefix",
-                                          s"--group=$groupPrefix",
-                                          s"--resource-pattern-type=prefixed",
-                                          s"--consumer",
-                                          s"--producer",
-                                          s"--allow-principal=$clientPrincipal")
-
   def ClusterActionAndClusterAlterAcls = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW),
     new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, ALTER, ALLOW))
-  def TopicBrokerReadAcl = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, READ, ALLOW))
   def GroupReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW))
   def TopicReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW))
   def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, WRITE, ALLOW))
   def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, DESCRIBE, ALLOW))
   def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, CREATE, ALLOW))
-  // The next two configuration parameters enable ZooKeeper secure ACLs
-  // and sets the Kafka authorizer, both necessary to enable security.
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
-  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName)
+
+  def AclClusterAction = new AclBinding(clusterResource,
+    new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW))
+  def AclAlter = new AclBinding(clusterResource,
+    new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.ALTER, AclPermissionType.ALLOW))
+
+  def AclTopicWrite(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW))
+  def AclTopicCreate(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW))
+  def AclTopicDescribe(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+  def AclTopicRead(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW))
+  def AclGroupRead = new AclBinding(groupResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW))
+
+  def AclWildcardTopicWrite = new AclBinding(wildcardTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW))
+  def AclWildcardTopicCreate = new AclBinding(wildcardTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW))
+  def AclWildcardTopicDescribe = new AclBinding(wildcardTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+  def AclWildcardTopicRead = new AclBinding(wildcardTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW))
+  def AclWildcardGroupRead = new AclBinding(wildcardGroupResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW))
+
+  def AclPrefixedTopicWrite = new AclBinding(prefixedTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW))
+  def AclPrefixedTopicCreate = new AclBinding(prefixedTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW))
+  def AclPrefixedTopicDescribe = new AclBinding(prefixedTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+  def AclPrefixedTopicRead = new AclBinding(prefixedTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW))
+  def AclPrefixedGroupRead = new AclBinding(prefixedGroupResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW))
+
+

Review Comment:
   Extra line



##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -170,6 +187,19 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
     admin
   }
 
+

Review Comment:
   Extra line



##########
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##########
@@ -292,24 +255,35 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   }
 
   private def setWildcardResourceAcls(): Unit = {
-    AclCommand.main(produceConsumeWildcardAclArgs)
+    val superuserAdminClient = createSuperuserAdminClient()
+    superuserAdminClient.createAcls(List(AclWildcardTopicWrite, AclWildcardTopicCreate, AclWildcardTopicDescribe, AclWildcardTopicRead).asJava).values
+    superuserAdminClient.createAcls(List(AclWildcardGroupRead).asJava).values
+
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource)
     }
   }
 
   private def setPrefixedResourceAcls(): Unit = {
-    AclCommand.main(produceConsumePrefixedAclsArgs)
+    val superuserAdminClient = createSuperuserAdminClient()
+    superuserAdminClient.createAcls(List(AclPrefixedTopicWrite, AclPrefixedTopicCreate, AclPrefixedTopicDescribe, AclPrefixedTopicRead).asJava).values
+    superuserAdminClient.createAcls(List(AclPrefixedGroupRead).asJava).values
+
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource)
     }
   }
 
   private def setReadAndWriteAcls(tp: TopicPartition): Unit = {
-    AclCommand.main(produceAclArgs(tp.topic))
-    AclCommand.main(consumeAclArgs(tp.topic))
+    val tptopicresource = new ResourcePattern(TOPIC, tp.topic, LITERAL)
+    val superuserAdminClient = createSuperuserAdminClient()
+
+    superuserAdminClient.createAcls(List(AclTopicWrite(tptopicresource), AclTopicCreate(tptopicresource), AclTopicDescribe(tptopicresource)).asJava).values
+    superuserAdminClient.createAcls(List(AclTopicRead(tptopicresource)).asJava).values

Review Comment:
   s/tptopicresource/topicResource/g



##########
core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala:
##########
@@ -76,6 +77,15 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
     super.setUp(testInfo)
   }
 
+  /*
+   * The principal used for all authenticated connections to listernerName is always clientPrincipal.

Review Comment:
   s/listernerName/listenerName



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org