You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/08 21:01:50 UTC

[GitHub] [kafka] mumrah opened a new pull request #9715: Upstream ApisUtils from kip-500

mumrah opened a new pull request #9715:
URL: https://github.com/apache/kafka/pull/9715


   In the KIP-500 development branch, we have a separate ControllerApis that shares a lot of functionality with KafkaApis. We introduced a utility class ApisUtils to pull out the common code. Some things were moved to RequestChannel as well.
   
   We'd like to upstream this work now so we don't continue to diverge (since KafkaApis is a frequently modified class). There should be no logical changes in this PR, only shuffling code around.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#issuecomment-741791066


   @chia7712 thanks for the feedback. I've incorporated your suggestions, can you take another look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r557669397



##########
File path: core/src/main/scala/kafka/server/AuthHelper.scala
##########
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.lang.{Byte => JByte}
+import java.util.Collections
+
+import kafka.network.RequestChannel
+import kafka.security.authorizer.AclEntry
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.CLUSTER
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+
+import scala.jdk.CollectionConverters._
+
+
+class AuthHelper(val requestChannel: RequestChannel,
+                 val authorizer: Option[Authorizer]) {

Review comment:
       Adding `val` to the constructor arguments makes the `public` members. Glancing at the code it doesn't look like this is needed.

##########
File path: core/src/main/scala/kafka/server/RequestHandlerHelper.scala
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.Partition
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.network.RequestChannel
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
+import org.apache.kafka.common.utils.Time
+
+
+object RequestHandlerHelper {
+
+  def onLeadershipChange(groupCoordinator: GroupCoordinator,
+                         txnCoordinator: TransactionCoordinator,
+                         updatedLeaders: Iterable[Partition],
+                         updatedFollowers: Iterable[Partition]): Unit = {
+    // for each new leader or follower, call coordinator to handle consumer group migration.
+    // this callback is invoked under the replica state change lock to ensure proper order of
+    // leadership changes
+    updatedLeaders.foreach { partition =>
+      if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        groupCoordinator.onElection(partition.partitionId)
+      else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+        txnCoordinator.onElection(partition.partitionId, partition.getLeaderEpoch)
+    }
+
+    updatedFollowers.foreach { partition =>
+      if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        groupCoordinator.onResignation(partition.partitionId)
+      else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+        txnCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch))
+    }
+  }
+}
+
+
+
+class RequestHandlerHelper(val requestChannel: RequestChannel,
+                           val quotas: QuotaManagers,
+                           val time: Time,
+                           val logPrefix: String) extends Logging {

Review comment:
       Adding `val` to the constructor arguments makes them `public` members. Glancing at the code it doesn't look like this is needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r539362958



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -432,6 +432,43 @@ class RequestChannel(val queueSize: Int,
     }
   }
 
+  def sendResponse(request: RequestChannel.Request,
+                   responseOpt: Option[AbstractResponse],
+                   onComplete: Option[Send => Unit]): Unit = {
+    // Update error metrics for each error code in the response including Errors.NONE
+    responseOpt.foreach(response => updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))
+
+    val response = responseOpt match {
+      case Some(response) =>
+        new RequestChannel.SendResponse(
+          request,
+          request.buildResponseSend(response),
+          request.responseString(response),
+          onComplete
+        )
+      case None =>
+        new RequestChannel.NoOpResponse(request)
+    }
+
+    sendResponse(response)
+  }
+
+  def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable, throttleMs: Int): Unit = {

Review comment:
       just curious. Why this method is located in ```RequestChannel``` rather than ```ApiUtils```

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -369,15 +369,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         /* start processing requests */
-        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel,

Review comment:
       Is this change still valid?

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -369,15 +369,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         /* start processing requests */
-        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel,
+          replicaManager, adminManager, groupCoordinator, transactionCoordinator,
           kafkaController, forwardingManager, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
           fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
 
         dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
           config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
 
         socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
-          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel,

Review comment:
       ditto

##########
File path: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
##########
@@ -34,6 +34,10 @@ trait ApiRequestHandler {
   def handle(request: RequestChannel.Request): Unit
 }
 
+trait BaseApis extends ApiRequestHandler {

Review comment:
       BTW, my thought was 
   
   ```scala
   trait ApisUtils extends Logging {
     this: KafkaApis =>
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
##########
@@ -34,6 +34,10 @@ trait ApiRequestHandler {
   def handle(request: RequestChannel.Request): Unit
 }
 
+trait BaseApis extends ApiRequestHandler {

Review comment:
       not sure whether ```BaseApis```  is required. ```KafkaApis``` can still extend ```ApiRequestHandler``` as you have added required variables to ```ApisUtils```
   ```scala
     val requestChannel: RequestChannel
     val quotas: QuotaManagers
     val time: Time
     val authorizer: Option[Authorizer]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r539437410



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -98,6 +97,7 @@ import scala.annotation.nowarn
  * Logic to handle the various Kafka requests
  */
 class KafkaApis(val requestChannel: RequestChannel,
+                val apisUtils: ApisUtils,

Review comment:
       1. Agreed. I don't really like the name, but can't think of a better one.
   
   2. Originally, I had hoped to remove some dependencies from KafkaApis when pulling out these methods, but it turns out they are all needed for other things as well. In the end it just adds a new dependency (which is fine). Seeing how I had to mess around with the type hierarchy to not break or complicate TestRaftRequestHandler, I'm leaning towards the delegation approach we had before.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r539295919



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -98,6 +97,7 @@ import scala.annotation.nowarn
  * Logic to handle the various Kafka requests
  */
 class KafkaApis(val requestChannel: RequestChannel,
+                val apisUtils: ApisUtils,

Review comment:
       Thanks, @chia7712, I like this suggestion. I'll see if I can figure this out (I wasn't too happy with the utility class needing to be instantiated either)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#issuecomment-741089348


   @abbccdda can you take a look at the forwarding parts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r539329604



##########
File path: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
##########
@@ -34,6 +34,10 @@ trait ApiRequestHandler {
   def handle(request: RequestChannel.Request): Unit
 }
 
+trait BaseApis extends ApiRequestHandler {

Review comment:
       Added this trait to avoid polluting TestRaftRequestHandler with the ApisUtils dependencies




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r539008999



##########
File path: core/src/main/scala/kafka/server/ApisUtils.scala
##########
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.lang.{Byte => JByte}
+import java.util.Collections
+
+import kafka.network.RequestChannel
+import kafka.security.authorizer.AclEntry
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext}
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.CLUSTER
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Helper class for request handlers. Provides common functionality around throttling, authorizations, and error handling
+ */
+class ApisUtils(val requestChannel: RequestChannel,
+                val authorizer: Option[Authorizer],
+                val quotas: QuotaManagers,
+                val time: Time) extends Logging {
+
+  // private package for testing
+  def authorize(requestContext: RequestContext,
+                operation: AclOperation,
+                resourceType: ResourceType,
+                resourceName: String,
+                logIfAllowed: Boolean = true,
+                logIfDenied: Boolean = true,
+                refCount: Int = 1): Boolean = {
+    authorizer.forall { authZ =>
+      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
+      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
+      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+    }
+  }
+
+  def authorizeClusterOperation(request: RequestChannel.Request, operation: AclOperation): Unit = {
+    if (!authorize(request.context, operation, CLUSTER, CLUSTER_NAME))
+      throw new ClusterAuthorizationException(s"Request $request is not authorized.")
+  }
+
+  def authorizedOperations(request: RequestChannel.Request, resource: Resource): Int = {
+    val supportedOps = AclEntry.supportedOperations(resource.resourceType).toList
+    val authorizedOps = authorizer match {
+      case Some(authZ) =>
+        val resourcePattern = new ResourcePattern(resource.resourceType, resource.name, PatternType.LITERAL)
+        val actions = supportedOps.map { op => new Action(op, resourcePattern, 1, false, false) }
+        authZ.authorize(request.context, actions.asJava).asScala
+          .zip(supportedOps)
+          .filter(_._1 == AuthorizationResult.ALLOWED)
+          .map(_._2).toSet
+      case None =>
+        supportedOps.toSet
+    }
+    Utils.to32BitField(authorizedOps.map(operation => operation.code.asInstanceOf[JByte]).asJava)
+  }
+
+  def handleError(request: RequestChannel.Request, e: Throwable): Unit = {
+    val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !request.header.apiKey.clusterAction
+    error("Error when handling request: " +
+      s"clientId=${request.header.clientId}, " +
+      s"correlationId=${request.header.correlationId}, " +
+      s"api=${request.header.apiKey}, " +
+      s"version=${request.header.apiVersion}, " +
+      s"body=${request.body[AbstractRequest]}", e)
+    if (mayThrottle)
+      sendErrorResponseMaybeThrottle(request, e)
+    else
+      sendErrorResponseExemptThrottle(request, e)
+  }
+
+  def sendForwardedResponse(
+    request: RequestChannel.Request,
+    response: AbstractResponse
+  ): Unit = {
+    // For forwarded requests, we take the throttle time from the broker that
+    // the request was forwarded to
+    val throttleTimeMs = response.throttleTimeMs()
+    quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
+    requestChannel.sendResponse(request, Some(response), None)
+  }
+
+  // Throttle the channel if the request quota is enabled but has been violated. Regardless of throttling, send the
+  // response immediately.
+  def sendResponseMaybeThrottle(request: RequestChannel.Request,
+                                createResponse: Int => AbstractResponse,
+                                onComplete: Option[Send => Unit] = None): Unit = {

Review comment:
       ```onComplete``` is unused.

##########
File path: core/src/main/scala/kafka/server/ApisUtils.scala
##########
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.lang.{Byte => JByte}
+import java.util.Collections
+
+import kafka.network.RequestChannel
+import kafka.security.authorizer.AclEntry
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext}
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.CLUSTER
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Helper class for request handlers. Provides common functionality around throttling, authorizations, and error handling
+ */
+class ApisUtils(val requestChannel: RequestChannel,
+                val authorizer: Option[Authorizer],
+                val quotas: QuotaManagers,
+                val time: Time) extends Logging {
+
+  // private package for testing
+  def authorize(requestContext: RequestContext,
+                operation: AclOperation,
+                resourceType: ResourceType,
+                resourceName: String,
+                logIfAllowed: Boolean = true,
+                logIfDenied: Boolean = true,
+                refCount: Int = 1): Boolean = {
+    authorizer.forall { authZ =>
+      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
+      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
+      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+    }
+  }
+
+  def authorizeClusterOperation(request: RequestChannel.Request, operation: AclOperation): Unit = {
+    if (!authorize(request.context, operation, CLUSTER, CLUSTER_NAME))
+      throw new ClusterAuthorizationException(s"Request $request is not authorized.")
+  }
+
+  def authorizedOperations(request: RequestChannel.Request, resource: Resource): Int = {
+    val supportedOps = AclEntry.supportedOperations(resource.resourceType).toList
+    val authorizedOps = authorizer match {
+      case Some(authZ) =>
+        val resourcePattern = new ResourcePattern(resource.resourceType, resource.name, PatternType.LITERAL)
+        val actions = supportedOps.map { op => new Action(op, resourcePattern, 1, false, false) }
+        authZ.authorize(request.context, actions.asJava).asScala
+          .zip(supportedOps)
+          .filter(_._1 == AuthorizationResult.ALLOWED)
+          .map(_._2).toSet
+      case None =>
+        supportedOps.toSet
+    }
+    Utils.to32BitField(authorizedOps.map(operation => operation.code.asInstanceOf[JByte]).asJava)
+  }
+
+  def handleError(request: RequestChannel.Request, e: Throwable): Unit = {
+    val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !request.header.apiKey.clusterAction
+    error("Error when handling request: " +
+      s"clientId=${request.header.clientId}, " +
+      s"correlationId=${request.header.correlationId}, " +
+      s"api=${request.header.apiKey}, " +
+      s"version=${request.header.apiVersion}, " +
+      s"body=${request.body[AbstractRequest]}", e)
+    if (mayThrottle)
+      sendErrorResponseMaybeThrottle(request, e)
+    else
+      sendErrorResponseExemptThrottle(request, e)
+  }
+
+  def sendForwardedResponse(
+    request: RequestChannel.Request,
+    response: AbstractResponse
+  ): Unit = {
+    // For forwarded requests, we take the throttle time from the broker that
+    // the request was forwarded to
+    val throttleTimeMs = response.throttleTimeMs()
+    quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
+    requestChannel.sendResponse(request, Some(response), None)
+  }
+
+  // Throttle the channel if the request quota is enabled but has been violated. Regardless of throttling, send the
+  // response immediately.
+  def sendResponseMaybeThrottle(request: RequestChannel.Request,
+                                createResponse: Int => AbstractResponse,
+                                onComplete: Option[Send => Unit] = None): Unit = {
+    val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    // Only throttle non-forwarded requests
+    if (!request.isForwarded)
+      quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
+    requestChannel.sendResponse(request, Some(createResponse(throttleTimeMs)), None)
+  }
+
+  def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, error: Throwable): Unit = {
+    val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    // Only throttle non-forwarded requests or cluster authorization failures
+    if (error.isInstanceOf[ClusterAuthorizationException] || !request.isForwarded)
+      quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
+    sendErrorOrCloseConnection(request, error, throttleTimeMs)
+  }
+
+  private def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request): Int = {
+    val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, time.milliseconds())
+    request.apiThrottleTimeMs = throttleTimeMs
+    throttleTimeMs
+  }
+
+  /**
+   * Throttle the channel if the controller mutations quota or the request quota have been violated.
+   * Regardless of throttling, send the response immediately.
+   */
+  def sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota: ControllerMutationQuota,
+                                                   request: RequestChannel.Request,
+                                                   createResponse: Int => AbstractResponse,
+                                                   onComplete: Option[Send => Unit]): Unit = {

Review comment:
       ditto

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -432,6 +432,43 @@ class RequestChannel(val queueSize: Int,
     }
   }
 
+  def sendResponse(request: RequestChannel.Request,
+                   responseOpt: Option[AbstractResponse],
+                   onComplete: Option[Send => Unit]): Unit = {
+    // Update error metrics for each error code in the response including Errors.NONE
+    responseOpt.foreach(response => updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))
+
+    val response = responseOpt match {
+      case Some(response) =>
+        new RequestChannel.SendResponse(
+          request,
+          request.buildResponseSend(response),
+          request.responseString(response),
+          onComplete
+        )
+      case None =>
+        new RequestChannel.NoOpResponse(request)
+    }
+
+    sendResponse(response)
+  }
+
+  def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable, throttleMs: Int): Unit = {

Review comment:
       There is a another "sendErrorOrCloseConnection" in ```ApisUtils```

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -98,6 +97,7 @@ import scala.annotation.nowarn
  * Logic to handle the various Kafka requests
  */
 class KafkaApis(val requestChannel: RequestChannel,
+                val apisUtils: ApisUtils,

Review comment:
       Instantiating a "utils" object is a bit weird to me. it seems to me we can make ApisUtils be a ```trait``` with self-type (this: KafkaApis) if the main purpose of this PR is to reduce the size of ```KafkaApis```. The benefit of using self-type is that 
   1. we don't need to instantiate a "utils"
   1. we can move some code from KafkaApis to ApisUtils
   1. we don't need to change "autxxx" to "apisUtils.autxxx"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#issuecomment-741876404


   > I think it makes sense to go with a simple change here that extracts commonly used methods. If we want to discuss breaking up KafkaApis, that is a fair discussion, but can happen separately. Does that make sense
   
   makes sense to me :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r557460726



##########
File path: core/src/main/scala/kafka/server/RequestHandlerUtils.scala
##########
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.Partition
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+
+import java.lang.{Byte => JByte}
+import java.util.Collections
+import kafka.network.RequestChannel
+import kafka.security.authorizer.AclEntry
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext}
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.CLUSTER
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Helper methods for request handlers
+ */
+object RequestHandlerUtils {
+  def onLeadershipChange(groupCoordinator: GroupCoordinator,
+                         txnCoordinator: TransactionCoordinator,
+                         updatedLeaders: Iterable[Partition],
+                         updatedFollowers: Iterable[Partition]): Unit = {
+    // for each new leader or follower, call coordinator to handle consumer group migration.
+    // this callback is invoked under the replica state change lock to ensure proper order of
+    // leadership changes
+    updatedLeaders.foreach { partition =>
+      if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        groupCoordinator.onElection(partition.partitionId)
+      else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+        txnCoordinator.onElection(partition.partitionId, partition.getLeaderEpoch)
+    }
+
+    updatedFollowers.foreach { partition =>
+      if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        groupCoordinator.onResignation(partition.partitionId)
+      else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+        txnCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch))
+    }
+  }
+}
+
+class AuthHelper(val requestChannel: RequestChannel,

Review comment:
       I would move this to its own file. It seems unrelated to the other things in this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r557444603



##########
File path: core/src/main/scala/kafka/server/ApisUtils.scala
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.Partition
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+
+import java.lang.{Byte => JByte}
+import java.util.Collections
+import kafka.network.RequestChannel
+import kafka.security.authorizer.AclEntry
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext}
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.CLUSTER
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
+import org.apache.kafka.common.utils.{LogContext, Time, Utils}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Helper methods and helper class factories for request handlers. Provides common functionality around throttling,
+ * authorizations, and error handling
+ */
+object ApisUtils {

Review comment:
       It is pretty awkward, but `ApiUtils` was taken. `RequestHandlerUtils` sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r557461217



##########
File path: core/src/main/scala/kafka/server/RequestHandlerUtils.scala
##########
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.Partition
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+
+import java.lang.{Byte => JByte}
+import java.util.Collections
+import kafka.network.RequestChannel
+import kafka.security.authorizer.AclEntry
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext}
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.CLUSTER
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Helper methods for request handlers
+ */
+object RequestHandlerUtils {
+  def onLeadershipChange(groupCoordinator: GroupCoordinator,
+                         txnCoordinator: TransactionCoordinator,
+                         updatedLeaders: Iterable[Partition],
+                         updatedFollowers: Iterable[Partition]): Unit = {
+    // for each new leader or follower, call coordinator to handle consumer group migration.
+    // this callback is invoked under the replica state change lock to ensure proper order of
+    // leadership changes
+    updatedLeaders.foreach { partition =>
+      if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        groupCoordinator.onElection(partition.partitionId)
+      else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+        txnCoordinator.onElection(partition.partitionId, partition.getLeaderEpoch)
+    }
+
+    updatedFollowers.foreach { partition =>
+      if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        groupCoordinator.onResignation(partition.partitionId)
+      else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+        txnCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch))
+    }
+  }
+}
+
+class AuthHelper(val requestChannel: RequestChannel,
+                 val authorizer: Option[Authorizer]) {
+  def authorize(requestContext: RequestContext,
+                operation: AclOperation,
+                resourceType: ResourceType,
+                resourceName: String,
+                logIfAllowed: Boolean = true,
+                logIfDenied: Boolean = true,
+                refCount: Int = 1): Boolean = {
+    authorizer.forall { authZ =>
+      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
+      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
+      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+    }
+  }
+
+  def authorizeClusterOperation(request: RequestChannel.Request, operation: AclOperation): Unit = {
+    if (!authorize(request.context, operation, CLUSTER, CLUSTER_NAME))
+      throw new ClusterAuthorizationException(s"Request $request is not authorized.")
+  }
+
+  def authorizedOperations(request: RequestChannel.Request, resource: Resource): Int = {
+    val supportedOps = AclEntry.supportedOperations(resource.resourceType).toList
+    val authorizedOps = authorizer match {
+      case Some(authZ) =>
+        val resourcePattern = new ResourcePattern(resource.resourceType, resource.name, PatternType.LITERAL)
+        val actions = supportedOps.map { op => new Action(op, resourcePattern, 1, false, false) }
+        authZ.authorize(request.context, actions.asJava).asScala
+          .zip(supportedOps)
+          .filter(_._1 == AuthorizationResult.ALLOWED)
+          .map(_._2).toSet
+      case None =>
+        supportedOps.toSet
+    }
+    Utils.to32BitField(authorizedOps.map(operation => operation.code.asInstanceOf[JByte]).asJava)
+  }
+}
+
+class ChannelHelper(val requestChannel: RequestChannel,

Review comment:
       Should this be `RequestHandlerHelper` then?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
ijuma merged pull request #9715:
URL: https://github.com/apache/kafka/pull/9715


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#issuecomment-761171619


   The test failures were due to unhappy mocks in KafkaApisTest. It looks rather tedious to fix the mocks so I ended up moving two of the methods I had put into RequestChannel back into the helper class. They are now passing for me locally.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r539429096



##########
File path: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
##########
@@ -34,6 +34,10 @@ trait ApiRequestHandler {
   def handle(request: RequestChannel.Request): Unit
 }
 
+trait BaseApis extends ApiRequestHandler {

Review comment:
       Self types like this are an anti pattern, in my opinion. They were used years ago as a DI replacement (the cake pattern), but people have moved on since they tend to be hard to understand.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r556993362



##########
File path: core/src/main/scala/kafka/server/ApisUtils.scala
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.Partition
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+
+import java.lang.{Byte => JByte}
+import java.util.Collections
+import kafka.network.RequestChannel
+import kafka.security.authorizer.AclEntry
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext}
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.CLUSTER
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
+import org.apache.kafka.common.utils.{LogContext, Time, Utils}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Helper methods and helper class factories for request handlers. Provides common functionality around throttling,
+ * authorizations, and error handling
+ */
+object ApisUtils {

Review comment:
       Might be just me, but the plural `Apis` reads awkwardly to me. How about `RequestHandlerUtils`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r556887264



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -432,6 +432,43 @@ class RequestChannel(val queueSize: Int,
     }
   }
 
+  def sendResponse(request: RequestChannel.Request,
+                   responseOpt: Option[AbstractResponse],
+                   onComplete: Option[Send => Unit]): Unit = {
+    // Update error metrics for each error code in the response including Errors.NONE
+    responseOpt.foreach(response => updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))
+
+    val response = responseOpt match {
+      case Some(response) =>
+        new RequestChannel.SendResponse(
+          request,
+          request.buildResponseSend(response),
+          request.responseString(response),
+          onComplete
+        )
+      case None =>
+        new RequestChannel.NoOpResponse(request)
+    }
+
+    sendResponse(response)
+  }
+
+  def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable, throttleMs: Int): Unit = {

Review comment:
       I suppose I put it here since it doesn't need any of the other objects that are provided by ApisUtils, it just needs the RequestChannel. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#issuecomment-741871617


   Note that the goal here is not to reduce `KafkaApis` footprint, it is to allow reuse from `ControllerApis`. The latter is required for the KIP-500 effort. I think it makes sense to go with a simple change here that extracts commonly used methods. If we want to discuss breaking up `KafkaApis`, that is a fair discussion, but can happen separately. Does that make sense @chia7712?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r556818362



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -412,8 +412,8 @@ class RequestChannel(val queueSize: Int,
   }
 
   def sendResponse(request: RequestChannel.Request,
-                   responseOpt: Option[AbstractResponse],
-                   onComplete: Option[Send => Unit]): Unit = {
+                           responseOpt: Option[AbstractResponse],
+                           onComplete: Option[Send => Unit]): Unit = {

Review comment:
       Is this intentional?

##########
File path: core/src/main/scala/kafka/server/ApisUtils.scala
##########
@@ -109,7 +106,7 @@ trait ApisUtils extends Logging {
   // Throttle the channel if the request quota is enabled but has been violated. Regardless of throttling, send the
   // response immediately.
   def sendResponseMaybeThrottle(request: RequestChannel.Request,
-                                createResponse: Int => AbstractResponse): Unit = {
+                                        createResponse: Int => AbstractResponse): Unit = {

Review comment:
       Is this intentional?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r557461217



##########
File path: core/src/main/scala/kafka/server/RequestHandlerUtils.scala
##########
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.Partition
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+
+import java.lang.{Byte => JByte}
+import java.util.Collections
+import kafka.network.RequestChannel
+import kafka.security.authorizer.AclEntry
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext}
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.CLUSTER
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Helper methods for request handlers
+ */
+object RequestHandlerUtils {
+  def onLeadershipChange(groupCoordinator: GroupCoordinator,
+                         txnCoordinator: TransactionCoordinator,
+                         updatedLeaders: Iterable[Partition],
+                         updatedFollowers: Iterable[Partition]): Unit = {
+    // for each new leader or follower, call coordinator to handle consumer group migration.
+    // this callback is invoked under the replica state change lock to ensure proper order of
+    // leadership changes
+    updatedLeaders.foreach { partition =>
+      if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        groupCoordinator.onElection(partition.partitionId)
+      else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+        txnCoordinator.onElection(partition.partitionId, partition.getLeaderEpoch)
+    }
+
+    updatedFollowers.foreach { partition =>
+      if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        groupCoordinator.onResignation(partition.partitionId)
+      else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+        txnCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch))
+    }
+  }
+}
+
+class AuthHelper(val requestChannel: RequestChannel,
+                 val authorizer: Option[Authorizer]) {
+  def authorize(requestContext: RequestContext,
+                operation: AclOperation,
+                resourceType: ResourceType,
+                resourceName: String,
+                logIfAllowed: Boolean = true,
+                logIfDenied: Boolean = true,
+                refCount: Int = 1): Boolean = {
+    authorizer.forall { authZ =>
+      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
+      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
+      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+    }
+  }
+
+  def authorizeClusterOperation(request: RequestChannel.Request, operation: AclOperation): Unit = {
+    if (!authorize(request.context, operation, CLUSTER, CLUSTER_NAME))
+      throw new ClusterAuthorizationException(s"Request $request is not authorized.")
+  }
+
+  def authorizedOperations(request: RequestChannel.Request, resource: Resource): Int = {
+    val supportedOps = AclEntry.supportedOperations(resource.resourceType).toList
+    val authorizedOps = authorizer match {
+      case Some(authZ) =>
+        val resourcePattern = new ResourcePattern(resource.resourceType, resource.name, PatternType.LITERAL)
+        val actions = supportedOps.map { op => new Action(op, resourcePattern, 1, false, false) }
+        authZ.authorize(request.context, actions.asJava).asScala
+          .zip(supportedOps)
+          .filter(_._1 == AuthorizationResult.ALLOWED)
+          .map(_._2).toSet
+      case None =>
+        supportedOps.toSet
+    }
+    Utils.to32BitField(authorizedOps.map(operation => operation.code.asInstanceOf[JByte]).asJava)
+  }
+}
+
+class ChannelHelper(val requestChannel: RequestChannel,

Review comment:
       Should this be `RequestHandlerHelper` then? Maybe we should ditch the `Utils` name and only have `RequestHandlerHelper` with the static methods in the companion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9715: Upstream ApisUtils from kip-500

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r539390329



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -98,6 +97,7 @@ import scala.annotation.nowarn
  * Logic to handle the various Kafka requests
  */
 class KafkaApis(val requestChannel: RequestChannel,
+                val apisUtils: ApisUtils,

Review comment:
       Is this an improvement? I am not sure. A couple of things:
   1. The `Utils` suffix looks odd both in a trait as well as a class that is instantiated.
   2. Using delegation allows for better separation of concerns than inheritance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org