You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2024/01/31 14:38:21 UTC

(kafka) branch trunk updated: KAFKA-15853: Move AuthorizerUtils and its dependencies to server module (#15167)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 127fe7d2765 KAFKA-15853: Move AuthorizerUtils and its dependencies to server module (#15167)
127fe7d2765 is described below

commit 127fe7d2765cfdcee6243cbc62a77a7760b3c84c
Author: Omnia Ibrahim <o....@gmail.com>
AuthorDate: Wed Jan 31 14:38:14 2024 +0000

    KAFKA-15853: Move AuthorizerUtils and its dependencies to server module (#15167)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 checkstyle/import-control-server.xml               | 16 +++--
 core/src/main/scala/kafka/admin/AclCommand.scala   |  3 +-
 .../main/scala/kafka/network/RequestChannel.scala  | 11 +--
 .../scala/kafka/network/RequestConvertToJson.scala |  2 +-
 .../security/authorizer/AuthorizerUtils.scala      | 47 -------------
 core/src/main/scala/kafka/server/AclApis.scala     |  2 +-
 .../scala/kafka/server/ClientQuotaManager.scala    |  2 +-
 .../server/ControllerMutationQuotaManager.scala    |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  2 +-
 .../delegation/DelegationTokenManagerTest.scala    |  7 +-
 .../kafka/server/BaseClientQuotaManagerTest.scala  |  5 +-
 .../unit/kafka/server/ClientQuotaManagerTest.scala |  6 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  4 +-
 .../java/org/apache/kafka/network/Session.java     | 34 +++++++++
 .../kafka/security/authorizer/AuthorizerUtils.java | 81 ++++++++++++++++++++++
 15 files changed, 148 insertions(+), 76 deletions(-)

diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml
index c395a5f2214..0fad8c16273 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -18,7 +18,7 @@
  limitations under the License.
 -->
 
-<import-control pkg="org.apache.kafka.server">
+<import-control pkg="org.apache.kafka">
 
   <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
 
@@ -77,9 +77,17 @@
   <allow pkg="org.apache.kafka.common" />
   <allow pkg="org.apache.kafka.raft" />
 
-  <subpackage name="metrics">
-    <allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
-    <allow pkg="org.apache.kafka.server.telemetry" />
+  <subpackage name="server">
+    <subpackage name="metrics">
+      <allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
+      <allow pkg="org.apache.kafka.server.telemetry" />
+    </subpackage>
+  </subpackage>
+
+  <subpackage name="security">
+    <allow pkg="org.apache.kafka.common.resource" />
+    <allow pkg="org.apache.kafka.network" />
+    <allow pkg="org.apache.kafka.server.authorizer" />
   </subpackage>
 
 </import-control>
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index f835475fa9e..2e867d76dd2 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import java.util.Properties
 import joptsimple._
 import joptsimple.util.EnumConverter
-import kafka.security.authorizer.{AclAuthorizer, AclEntry, AuthorizerUtils}
+import kafka.security.authorizer.{AclAuthorizer, AclEntry}
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
@@ -31,6 +31,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceP
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
+import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 6c08798031f..007049814cb 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -17,7 +17,6 @@
 
 package kafka.network
 
-import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.concurrent._
 import com.fasterxml.jackson.databind.JsonNode
@@ -34,8 +33,8 @@ import org.apache.kafka.common.message.EnvelopeResponseData
 import org.apache.kafka.common.network.{ClientInformation, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{Sanitizer, Time}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.network.Session
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import java.util
@@ -57,10 +56,6 @@ object RequestChannel extends Logging {
   case object ShutdownRequest extends BaseRequest
   case object WakeupRequest extends BaseRequest
 
-  case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
-    val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
-  }
-
   class Metrics(enabledApis: Iterable[ApiKeys]) {
     def this(scope: ListenerType) = {
       this(ApiKeys.apisForListener(scope).asScala)
@@ -103,7 +98,7 @@ object RequestChannel extends Logging {
     @volatile var callbackRequestDequeueTimeNanos: Option[Long] = None
     @volatile var callbackRequestCompleteTimeNanos: Option[Long] = None
 
-    val session = Session(context.principal, context.clientAddress)
+    val session = new Session(context.principal, context.clientAddress)
 
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
index bf120376342..cd42b17809b 100644
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
@@ -19,10 +19,10 @@ package kafka.network
 
 import com.fasterxml.jackson.databind.JsonNode
 import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode}
-import kafka.network.RequestChannel.Session
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.network.ClientInformation
 import org.apache.kafka.common.requests._
+import org.apache.kafka.network.Session
 
 object RequestConvertToJson {
   def request(request: AbstractRequest): JsonNode = {
diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
deleted file mode 100644
index 0e417d677eb..00000000000
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.security.authorizer
-
-import java.net.InetAddress
-
-import kafka.network.RequestChannel.Session
-import org.apache.kafka.common.resource.Resource
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
-
-
-object AuthorizerUtils {
-
-  def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer])
-
-  def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME)
-
-  def sessionToRequestContext(session: Session): AuthorizableRequestContext = {
-    new AuthorizableRequestContext {
-      override def clientId(): String = ""
-      override def requestType(): Int = -1
-      override def listenerName(): String = ""
-      override def clientAddress(): InetAddress = session.clientAddress
-      override def principal(): KafkaPrincipal = session.principal
-      override def securityProtocol(): SecurityProtocol = null
-      override def correlationId(): Int = -1
-      override def requestVersion(): Int = -1
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/server/AclApis.scala b/core/src/main/scala/kafka/server/AclApis.scala
index 485cafeca20..fd58bc8ba83 100644
--- a/core/src/main/scala/kafka/server/AclApis.scala
+++ b/core/src/main/scala/kafka/server/AclApis.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.network.RequestChannel
-import kafka.security.authorizer.AuthorizerUtils
 import kafka.utils.Logging
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclBinding
@@ -30,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.authorizer._
 
 import java.util
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5169509ea21..a10832d9690 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.network.RequestChannel
-import kafka.network.RequestChannel._
 import kafka.server.ClientQuotaManager._
 import kafka.utils.{Logging, QuotaUtils}
 import org.apache.kafka.common.{Cluster, MetricName}
@@ -33,6 +32,7 @@ import org.apache.kafka.common.utils.{Sanitizer, Time}
 import org.apache.kafka.server.config.{ConfigEntityName, ClientQuotaManagerConfig}
 import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
 import org.apache.kafka.server.util.ShutdownableThread
+import org.apache.kafka.network.Session
 
 import scala.jdk.CollectionConverters._
 
diff --git a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index b12d5aa4595..e1d38eb65ee 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -17,7 +17,6 @@
 package kafka.server
 
 import kafka.network.RequestChannel
-import kafka.network.RequestChannel.Session
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
 import org.apache.kafka.common.metrics.Metrics
@@ -27,6 +26,7 @@ import org.apache.kafka.common.metrics.stats.Rate
 import org.apache.kafka.common.metrics.stats.TokenBucket
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.network.Session
 import org.apache.kafka.server.quota.ClientQuotaCallback
 import org.apache.kafka.server.config.ClientQuotaManagerConfig
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a9cda78dd0f..f7967ed8457 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -21,7 +21,6 @@ import java.{lang, util}
 import java.util.concurrent.TimeUnit
 import java.util.{Collections, Properties}
 import kafka.cluster.EndPoint
-import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
 import kafka.utils.CoreUtils.parseCsvList
 import kafka.utils.{CoreUtils, Logging}
@@ -40,6 +39,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.security.PasswordEncoderConfigs
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index f829ce3570b..47f6783f2b9 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -20,8 +20,7 @@ package kafka.security.token.delegation
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.{Base64, Properties}
-import kafka.network.RequestChannel.Session
-import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils}
+import kafka.security.authorizer.AclAuthorizer
 import kafka.security.authorizer.AclEntry.WildcardHost
 import kafka.server.{CreateTokenResult, DelegationTokenManager, DelegationTokenManagerZk, KafkaConfig, QuorumTestHarness}
 import kafka.utils.TestUtils
@@ -38,6 +37,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{MockTime, SecurityUtils, Time}
+import org.apache.kafka.network.Session
+import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.config.Defaults
 import org.junit.jupiter.api.Assertions._
@@ -316,7 +317,7 @@ class DelegationTokenManagerTest extends QuorumTestHarness  {
     assertEquals(1, tokens.size)
 
     //get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions
-    hostSession = Session(renewer2, InetAddress.getByName("192.168.1.1"))
+    hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1"))
     createAcl(new AclBinding(new ResourcePattern(DELEGATION_TOKEN, tokenId2, LITERAL),
       new AccessControlEntry(renewer2.toString, WildcardHost, DESCRIBE, ALLOW)))
     tokens = getTokens(tokenManager, aclAuthorizer, hostSession,  renewer2, List(renewer2, renewer3))
diff --git a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
index edda22ff17c..af4e08f5586 100644
--- a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
@@ -19,9 +19,7 @@ package kafka.server
 import java.net.InetAddress
 import java.util
 import java.util.Collections
-
 import kafka.network.RequestChannel
-import kafka.network.RequestChannel.Session
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.{MetricConfig, Metrics}
@@ -31,6 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.network.Session
 import org.junit.jupiter.api.AfterEach
 import org.mockito.Mockito.mock
 
@@ -69,7 +68,7 @@ class BaseClientQuotaManagerTest {
 
   protected def buildSession(user: String): Session = {
     val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
-    Session(principal, null)
+    new Session(principal, null)
   }
 
   protected def maybeRecord(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = {
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 1dd1823b0f1..e67203c565f 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -17,12 +17,12 @@
 package kafka.server
 
 import java.net.InetAddress
-import kafka.network.RequestChannel.Session
 import kafka.server.QuotaType._
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ConfigEntityName}
+import org.apache.kafka.network.Session
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
@@ -143,7 +143,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
 
   private def checkQuota(quotaManager: ClientQuotaManager, user: String, clientId: String, expectedBound: Long, value: Int, expectThrottle: Boolean): Unit = {
     assertEquals(expectedBound.toDouble, quotaManager.quota(user, clientId).bound, 0.0)
-    val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost)
+    val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost)
     val expectedMaxValueInQuotaWindow =
       if (expectedBound < Long.MaxValue) config.quotaWindowSizeSeconds * (config.numQuotaSamples - 1) * expectedBound.toDouble
       else Double.MaxValue
@@ -161,7 +161,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
     val numFullQuotaWindows = 3   // 3 seconds window (vs. 10 seconds default)
     val nonDefaultConfig = new ClientQuotaManagerConfig(numFullQuotaWindows + 1)
     val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, Fetch, time, "")
-    val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost)
+    val userSession = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost)
 
     try {
       // no quota set
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d765dda5a0a..5ced592137c 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -15,7 +15,6 @@
 package kafka.server
 
 import kafka.api.LeaderAndIsr
-import kafka.network.RequestChannel.Session
 import kafka.security.authorizer.AclAuthorizer
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.common._
@@ -43,6 +42,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResou
 import org.apache.kafka.common.security.auth._
 import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer
+import org.apache.kafka.network.Session
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -223,7 +223,7 @@ class RequestQuotaTest extends BaseRequestTest {
     }
   }
 
-  def session(user: String): Session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
+  def session(user: String): Session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
 
   private def throttleTimeMetricValue(clientId: String): Double = {
     throttleTimeMetricValueForQuotaType(clientId, QuotaType.Request)
diff --git a/server/src/main/java/org/apache/kafka/network/Session.java b/server/src/main/java/org/apache/kafka/network/Session.java
new file mode 100644
index 00000000000..1793e4d565c
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/network/Session.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+
+import java.net.InetAddress;
+
+public class Session {
+    public final KafkaPrincipal principal;
+    public final InetAddress clientAddress;
+    public final String sanitizedUser;
+
+    public Session(KafkaPrincipal principal, InetAddress clientAddress) {
+        this.principal = principal;
+        this.clientAddress = clientAddress;
+        this.sanitizedUser = Sanitizer.sanitize(principal.getName());
+    }
+}
diff --git a/server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java b/server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java
new file mode 100644
index 00000000000..763f156158a
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.net.InetAddress;
+
+public class AuthorizerUtils {
+    public static Authorizer createAuthorizer(String className) throws ClassNotFoundException {
+        return Utils.newInstance(className, Authorizer.class);
+    }
+
+    public static boolean isClusterResource(String name) {
+        return name.equals(Resource.CLUSTER_NAME);
+    }
+
+    public static AuthorizableRequestContext sessionToRequestContext(Session session) {
+        return new AuthorizableRequestContext() {
+            @Override
+            public String clientId() {
+                return "";
+            }
+
+            @Override
+            public int requestType() {
+                return -1;
+            }
+
+            @Override
+            public String listenerName() {
+                return "";
+            }
+
+            @Override
+            public InetAddress clientAddress() {
+                return session.clientAddress;
+            }
+
+            @Override
+            public KafkaPrincipal principal() {
+                return session.principal;
+            }
+
+            @Override
+            public SecurityProtocol securityProtocol() {
+                return null;
+            }
+
+            @Override
+            public int correlationId() {
+                return -1;
+            }
+
+            @Override
+            public int requestVersion() {
+                return -1;
+            }
+        };
+    }
+}