You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2024/01/31 14:38:21 UTC
(kafka) branch trunk updated: KAFKA-15853: Move AuthorizerUtils and its dependencies to server module (#15167)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 127fe7d2765 KAFKA-15853: Move AuthorizerUtils and its dependencies to server module (#15167)
127fe7d2765 is described below
commit 127fe7d2765cfdcee6243cbc62a77a7760b3c84c
Author: Omnia Ibrahim <o....@gmail.com>
AuthorDate: Wed Jan 31 14:38:14 2024 +0000
KAFKA-15853: Move AuthorizerUtils and its dependencies to server module (#15167)
Reviewers: Mickael Maison <mi...@gmail.com>
---
checkstyle/import-control-server.xml | 16 +++--
core/src/main/scala/kafka/admin/AclCommand.scala | 3 +-
.../main/scala/kafka/network/RequestChannel.scala | 11 +--
.../scala/kafka/network/RequestConvertToJson.scala | 2 +-
.../security/authorizer/AuthorizerUtils.scala | 47 -------------
core/src/main/scala/kafka/server/AclApis.scala | 2 +-
.../scala/kafka/server/ClientQuotaManager.scala | 2 +-
.../server/ControllerMutationQuotaManager.scala | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../delegation/DelegationTokenManagerTest.scala | 7 +-
.../kafka/server/BaseClientQuotaManagerTest.scala | 5 +-
.../unit/kafka/server/ClientQuotaManagerTest.scala | 6 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +-
.../java/org/apache/kafka/network/Session.java | 34 +++++++++
.../kafka/security/authorizer/AuthorizerUtils.java | 81 ++++++++++++++++++++++
15 files changed, 148 insertions(+), 76 deletions(-)
diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml
index c395a5f2214..0fad8c16273 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -18,7 +18,7 @@
limitations under the License.
-->
-<import-control pkg="org.apache.kafka.server">
+<import-control pkg="org.apache.kafka">
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
@@ -77,9 +77,17 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.raft" />
- <subpackage name="metrics">
- <allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
- <allow pkg="org.apache.kafka.server.telemetry" />
+ <subpackage name="server">
+ <subpackage name="metrics">
+ <allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
+ <allow pkg="org.apache.kafka.server.telemetry" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="security">
+ <allow pkg="org.apache.kafka.common.resource" />
+ <allow pkg="org.apache.kafka.network" />
+ <allow pkg="org.apache.kafka.server.authorizer" />
</subpackage>
</import-control>
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index f835475fa9e..2e867d76dd2 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -20,7 +20,7 @@ package kafka.admin
import java.util.Properties
import joptsimple._
import joptsimple.util.EnumConverter
-import kafka.security.authorizer.{AclAuthorizer, AclEntry, AuthorizerUtils}
+import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
@@ -31,6 +31,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceP
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
+import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 6c08798031f..007049814cb 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -17,7 +17,6 @@
package kafka.network
-import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent._
import com.fasterxml.jackson.databind.JsonNode
@@ -34,8 +33,8 @@ import org.apache.kafka.common.message.EnvelopeResponseData
import org.apache.kafka.common.network.{ClientInformation, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{Sanitizer, Time}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.network.Session
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util
@@ -57,10 +56,6 @@ object RequestChannel extends Logging {
case object ShutdownRequest extends BaseRequest
case object WakeupRequest extends BaseRequest
- case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
- val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
- }
-
class Metrics(enabledApis: Iterable[ApiKeys]) {
def this(scope: ListenerType) = {
this(ApiKeys.apisForListener(scope).asScala)
@@ -103,7 +98,7 @@ object RequestChannel extends Logging {
@volatile var callbackRequestDequeueTimeNanos: Option[Long] = None
@volatile var callbackRequestCompleteTimeNanos: Option[Long] = None
- val session = Session(context.principal, context.clientAddress)
+ val session = new Session(context.principal, context.clientAddress)
private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
index bf120376342..cd42b17809b 100644
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
@@ -19,10 +19,10 @@ package kafka.network
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode}
-import kafka.network.RequestChannel.Session
import org.apache.kafka.common.message._
import org.apache.kafka.common.network.ClientInformation
import org.apache.kafka.common.requests._
+import org.apache.kafka.network.Session
object RequestConvertToJson {
def request(request: AbstractRequest): JsonNode = {
diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
deleted file mode 100644
index 0e417d677eb..00000000000
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.authorizer
-
-import java.net.InetAddress
-
-import kafka.network.RequestChannel.Session
-import org.apache.kafka.common.resource.Resource
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
-
-
-object AuthorizerUtils {
-
- def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer])
-
- def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME)
-
- def sessionToRequestContext(session: Session): AuthorizableRequestContext = {
- new AuthorizableRequestContext {
- override def clientId(): String = ""
- override def requestType(): Int = -1
- override def listenerName(): String = ""
- override def clientAddress(): InetAddress = session.clientAddress
- override def principal(): KafkaPrincipal = session.principal
- override def securityProtocol(): SecurityProtocol = null
- override def correlationId(): Int = -1
- override def requestVersion(): Int = -1
- }
- }
-}
diff --git a/core/src/main/scala/kafka/server/AclApis.scala b/core/src/main/scala/kafka/server/AclApis.scala
index 485cafeca20..fd58bc8ba83 100644
--- a/core/src/main/scala/kafka/server/AclApis.scala
+++ b/core/src/main/scala/kafka/server/AclApis.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.network.RequestChannel
-import kafka.security.authorizer.AuthorizerUtils
import kafka.utils.Logging
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclBinding
@@ -30,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.authorizer._
import java.util
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5169509ea21..a10832d9690 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.network.RequestChannel
-import kafka.network.RequestChannel._
import kafka.server.ClientQuotaManager._
import kafka.utils.{Logging, QuotaUtils}
import org.apache.kafka.common.{Cluster, MetricName}
@@ -33,6 +32,7 @@ import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.config.{ConfigEntityName, ClientQuotaManagerConfig}
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
import org.apache.kafka.server.util.ShutdownableThread
+import org.apache.kafka.network.Session
import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index b12d5aa4595..e1d38eb65ee 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -17,7 +17,6 @@
package kafka.server
import kafka.network.RequestChannel
-import kafka.network.RequestChannel.Session
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.metrics.Metrics
@@ -27,6 +26,7 @@ import org.apache.kafka.common.metrics.stats.Rate
import org.apache.kafka.common.metrics.stats.TokenBucket
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.network.Session
import org.apache.kafka.server.quota.ClientQuotaCallback
import org.apache.kafka.server.config.ClientQuotaManagerConfig
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a9cda78dd0f..f7967ed8457 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -21,7 +21,6 @@ import java.{lang, util}
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import kafka.cluster.EndPoint
-import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
import kafka.utils.CoreUtils.parseCsvList
import kafka.utils.{CoreUtils, Logging}
@@ -40,6 +39,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index f829ce3570b..47f6783f2b9 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -20,8 +20,7 @@ package kafka.security.token.delegation
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.{Base64, Properties}
-import kafka.network.RequestChannel.Session
-import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils}
+import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{CreateTokenResult, DelegationTokenManager, DelegationTokenManagerZk, KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
@@ -38,6 +37,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{MockTime, SecurityUtils, Time}
+import org.apache.kafka.network.Session
+import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.Defaults
import org.junit.jupiter.api.Assertions._
@@ -316,7 +317,7 @@ class DelegationTokenManagerTest extends QuorumTestHarness {
assertEquals(1, tokens.size)
//get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions
- hostSession = Session(renewer2, InetAddress.getByName("192.168.1.1"))
+ hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1"))
createAcl(new AclBinding(new ResourcePattern(DELEGATION_TOKEN, tokenId2, LITERAL),
new AccessControlEntry(renewer2.toString, WildcardHost, DESCRIBE, ALLOW)))
tokens = getTokens(tokenManager, aclAuthorizer, hostSession, renewer2, List(renewer2, renewer3))
diff --git a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
index edda22ff17c..af4e08f5586 100644
--- a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
@@ -19,9 +19,7 @@ package kafka.server
import java.net.InetAddress
import java.util
import java.util.Collections
-
import kafka.network.RequestChannel
-import kafka.network.RequestChannel.Session
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.{MetricConfig, Metrics}
@@ -31,6 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.network.Session
import org.junit.jupiter.api.AfterEach
import org.mockito.Mockito.mock
@@ -69,7 +68,7 @@ class BaseClientQuotaManagerTest {
protected def buildSession(user: String): Session = {
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
- Session(principal, null)
+ new Session(principal, null)
}
protected def maybeRecord(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = {
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 1dd1823b0f1..e67203c565f 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -17,12 +17,12 @@
package kafka.server
import java.net.InetAddress
-import kafka.network.RequestChannel.Session
import kafka.server.QuotaType._
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ConfigEntityName}
+import org.apache.kafka.network.Session
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -143,7 +143,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
private def checkQuota(quotaManager: ClientQuotaManager, user: String, clientId: String, expectedBound: Long, value: Int, expectThrottle: Boolean): Unit = {
assertEquals(expectedBound.toDouble, quotaManager.quota(user, clientId).bound, 0.0)
- val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost)
+ val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost)
val expectedMaxValueInQuotaWindow =
if (expectedBound < Long.MaxValue) config.quotaWindowSizeSeconds * (config.numQuotaSamples - 1) * expectedBound.toDouble
else Double.MaxValue
@@ -161,7 +161,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val numFullQuotaWindows = 3 // 3 seconds window (vs. 10 seconds default)
val nonDefaultConfig = new ClientQuotaManagerConfig(numFullQuotaWindows + 1)
val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, Fetch, time, "")
- val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost)
+ val userSession = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost)
try {
// no quota set
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d765dda5a0a..5ced592137c 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -15,7 +15,6 @@
package kafka.server
import kafka.api.LeaderAndIsr
-import kafka.network.RequestChannel.Session
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common._
@@ -43,6 +42,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResou
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
+import org.apache.kafka.network.Session
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -223,7 +223,7 @@ class RequestQuotaTest extends BaseRequestTest {
}
}
- def session(user: String): Session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
+ def session(user: String): Session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
private def throttleTimeMetricValue(clientId: String): Double = {
throttleTimeMetricValueForQuotaType(clientId, QuotaType.Request)
diff --git a/server/src/main/java/org/apache/kafka/network/Session.java b/server/src/main/java/org/apache/kafka/network/Session.java
new file mode 100644
index 00000000000..1793e4d565c
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/network/Session.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+
+import java.net.InetAddress;
+
+public class Session {
+ public final KafkaPrincipal principal;
+ public final InetAddress clientAddress;
+ public final String sanitizedUser;
+
+ public Session(KafkaPrincipal principal, InetAddress clientAddress) {
+ this.principal = principal;
+ this.clientAddress = clientAddress;
+ this.sanitizedUser = Sanitizer.sanitize(principal.getName());
+ }
+}
diff --git a/server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java b/server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java
new file mode 100644
index 00000000000..763f156158a
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.net.InetAddress;
+
+public class AuthorizerUtils {
+ public static Authorizer createAuthorizer(String className) throws ClassNotFoundException {
+ return Utils.newInstance(className, Authorizer.class);
+ }
+
+ public static boolean isClusterResource(String name) {
+ return name.equals(Resource.CLUSTER_NAME);
+ }
+
+ public static AuthorizableRequestContext sessionToRequestContext(Session session) {
+ return new AuthorizableRequestContext() {
+ @Override
+ public String clientId() {
+ return "";
+ }
+
+ @Override
+ public int requestType() {
+ return -1;
+ }
+
+ @Override
+ public String listenerName() {
+ return "";
+ }
+
+ @Override
+ public InetAddress clientAddress() {
+ return session.clientAddress;
+ }
+
+ @Override
+ public KafkaPrincipal principal() {
+ return session.principal;
+ }
+
+ @Override
+ public SecurityProtocol securityProtocol() {
+ return null;
+ }
+
+ @Override
+ public int correlationId() {
+ return -1;
+ }
+
+ @Override
+ public int requestVersion() {
+ return -1;
+ }
+ };
+ }
+}