You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/16 15:34:04 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #8933: KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part I, Broker Changes)

rajinisivaram commented on a change in pull request #8933:
URL: https://github.com/apache/kafka/pull/8933#discussion_r455832706



##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -295,34 +312,44 @@ class AdminManager(val config: KafkaConfig,
           throw new InvalidPartitionsException(s"Topic already has $oldNumPartitions partitions.")
         }
 
-        val newPartitionsAssignment = Option(newPartition.assignments)
-          .map { assignmentMap =>
-            val assignments = assignmentMap.asScala.map {
-              createPartitionAssignment => createPartitionAssignment.brokerIds.asScala.map(_.toInt)
-            }
-            val unknownBrokers = assignments.flatten.toSet -- allBrokerIds
-            if (unknownBrokers.nonEmpty)
-              throw new InvalidReplicaAssignmentException(
-                s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.")
-
-            if (assignments.size != numPartitionsIncrement)
-              throw new InvalidReplicaAssignmentException(
-                s"Increasing the number of partitions by $numPartitionsIncrement " +
-                  s"but ${assignments.size} assignments provided.")
-
-            assignments.zipWithIndex.map { case (replicas, index) =>
-              existingAssignment.size + index -> replicas
-            }.toMap
+        val newPartitionsAssignment = Option(newPartition.assignments).map { assignmentMap =>
+          val assignments = assignmentMap.asScala.map {
+            createPartitionAssignment => createPartitionAssignment.brokerIds.asScala.map(_.toInt)
+          }
+          val unknownBrokers = assignments.flatten.toSet -- allBrokerIds
+          if (unknownBrokers.nonEmpty)
+            throw new InvalidReplicaAssignmentException(
+              s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.")
+
+          if (assignments.size != numPartitionsIncrement)
+            throw new InvalidReplicaAssignmentException(
+              s"Increasing the number of partitions by $numPartitionsIncrement " +
+                s"but ${assignments.size} assignments provided.")
+
+          assignments.zipWithIndex.map { case (replicas, index) =>
+            existingAssignment.size + index -> replicas
+          }.toMap
         }
 
-        val updatedReplicaAssignment = adminZkClient.addPartitions(topic, existingAssignment, allBrokers,
-          newPartition.count, newPartitionsAssignment, validateOnly = validateOnly)
-        CreatePartitionsMetadata(topic, updatedReplicaAssignment.keySet, ApiError.NONE)
+        val assignmentForNewPartitions = adminZkClient.createNewPartitionsAssignment(
+          topic, existingAssignment, allBrokers, newPartition.count, newPartitionsAssignment)
+
+        if (validateOnly) {
+          CreatePartitionsMetadata(topic, (existingAssignment ++ assignmentForNewPartitions).keySet)

Review comment:
       Shouldn't validateOnly tell you know much you would have been throttled?

##########
File path: core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
##########
@@ -20,21 +20,32 @@ import java.util.concurrent.TimeUnit
 
 import kafka.network.RequestChannel
 import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.Sensor.QuotaEnforcementType
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.quota.ClientQuotaCallback
 
 import scala.jdk.CollectionConverters._
 
+object ClientRequestQuotaManager {
+  val QuotaRequestPercentDefault = Int.MaxValue.toDouble
+  val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
+}
 
 class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
                                 private val metrics: Metrics,
                                 private val time: Time,
-                                threadNamePrefix: String,
-                                quotaCallback: Option[ClientQuotaCallback])
-                                extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix, quotaCallback) {
-  val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
-  def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName)
+                                private val threadNamePrefix: String,
+                                private val quotaCallback: Option[ClientQuotaCallback])
+    extends ClientQuotaManager(config, metrics, QuotaType.Request, QuotaEnforcementType.PERMISSIVE,
+      time, threadNamePrefix, quotaCallback) {
+
+  private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
+  private val exemptMetricName = metrics.metricName("exempt-request-time",
+    QuotaType.Request.toString, "Tracking exempt-request-time utilization percentage")
+  private val exemptSensorName = "exempt-" + QuotaType.Request

Review comment:
       We might as well move the name to the companion object above since it is a constant?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
##########
@@ -97,7 +97,25 @@ public static RecordingLevel forName(String name) {
         public boolean shouldRecord(final int configId) {
             return configId == DEBUG.id || configId == this.id;
         }
+    }
 
+    public enum QuotaEnforcementType {

Review comment:
       Do we really need this? I didn't find the naming of the types very intuitive. We have different variations on the usage of the APIs. But given that we have a public API that enables us to check quotas and another to record with or without the check, we could just use the existing APIs? For example, we already have record without checking quota, we have record with check quota, we have record followed by unrecord. And now we want to check without recording. It seems simpler to leave it to the quota manager to record zero with  checkQuotas first and then record the value without checkQuotas if the first one succeeds.

##########
File path: core/src/main/scala/kafka/server/ClientQuotaManager.scala
##########
@@ -234,55 +266,85 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
-   * Returns maximum value (produced/consume bytes or request processing time) that could be recorded without guaranteed throttling.
-   * Recording any larger value will always be throttled, even if no other values were recorded in the quota window.
-   * This is used for deciding the maximum bytes that can be fetched at once
+   * Records that a user/clientId accumulated or would like to accumulate the provided amount at the
+   * the specified time, returns throttle time in milliseconds. Depending on the {QuotaEnforcementType}
+   * used, the behavior of this method changes:
+   * - QuotaEnforcementType.Strict verifies the quota is not violated before accumulating the
+   *   provided value. If it is, the value is not accumulated and the throttle time represents
+   *   the time to wait before the quota comes back to the defined limit.
+   * - QuotaEnforcementType.PERMISSIVE verifies the quota is not violated after accumulating the
+   *   provided value. If it is, the value is still accumulated and the throttle time represents
+   *   the time to wait before the quota comes back to the defined limit.
+   *
+   * @param session The session from which the user is extracted
+   * @param clientId The client id
+   * @param value The value to accumulate
+   * @param timeMs The time at which to accumulate the value
+   * @return The throttle time in milliseconds defines as the time to wait until the average
+   *         rate gets back to the defined quota
    */
-  def getMaxValueInQuotaWindow(session: Session, clientId: String): Double = {
-    if (quotasEnabled) {
-      val clientSensors = getOrCreateQuotaSensors(session, clientId)
-      Option(quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags.asJava))
-        .map(_.toDouble * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds)
-        .getOrElse(Double.MaxValue)
-    } else {
-      Double.MaxValue
-    }
-  }
-
   def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = {
     val clientSensors = getOrCreateQuotaSensors(session, clientId)
     try {
-      clientSensors.quotaSensor.record(value, timeMs)
+      clientSensors.quotaSensor.record(value, timeMs, quotaEnforcementType)
       0
     } catch {
       case e: QuotaViolationException =>
-        val throttleTimeMs = throttleTime(e.value, e.bound, windowSize(e.metric, timeMs)).toInt
+        val throttleTimeMs = throttleTime(e, timeMs).toInt
         debug(s"Quota violated for sensor (${clientSensors.quotaSensor.name}). Delay time: ($throttleTimeMs)")
         throttleTimeMs
     }
   }
 
-  /** "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value
-    * of the same quantity.
-    *
-    * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally,
-    * we would like to compute the throttle time before actually recording the value, but the current Sensor code
-    * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch
-    * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the
-    * overall sum back to the previous value.
-    */
+  /**
+   * Records that a user/clientId changed some metric being throttled without checking for
+   * quota violation. The aggregate value will subsequently be used for throttling when the
+   * next request is processed.
+   */
+  def recordNoThrottle(session: Session, clientId: String, value: Double): Unit = {
+    val clientSensors = getOrCreateQuotaSensors(session, clientId)
+    clientSensors.quotaSensor.record(value, time.milliseconds(), QuotaEnforcementType.NONE)
+  }
+
+  /**
+   * "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value
+   * of the same quantity.
+   *
+   * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally,
+   * we would like to compute the throttle time before actually recording the value, but the current Sensor code
+   * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch
+   * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the
+   * overall sum back to the previous value.
+   */
   def unrecordQuotaSensor(request: RequestChannel.Request, value: Double, timeMs: Long): Unit = {
     val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
-    clientSensors.quotaSensor.record(value * (-1), timeMs, false)
+    clientSensors.quotaSensor.record(value * (-1), timeMs, QuotaEnforcementType.NONE)
   }
 
   /**
-    * Throttle a client by muting the associated channel for the given throttle time.
-    * @param request client request
-    * @param throttleTimeMs Duration in milliseconds for which the channel is to be muted.
-    * @param channelThrottlingCallback Callback for channel throttling
-    * @return ThrottledChannel object
-    */
+   * Returns maximum value that could be recorded without guaranteed throttling.
+   * Recording any larger value will always be throttled, even if no other values were recorded in the quota window.
+   * This is used for deciding the maximum bytes that can be fetched at once
+   */
+  def getMaxValueInQuotaWindow(session: Session, clientId: String): Double = {
+    if (quotasEnabled) {
+      val clientSensors = getOrCreateQuotaSensors(session, clientId)
+      Option(quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags.asJava))
+        .map(_.toDouble * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds)
+        .getOrElse(Double.MaxValue)
+    } else {
+      Double.MaxValue
+    }
+  }
+
+  /**
+   * Throttle a client by muting the associated channel for the given throttle time.
+   *
+   * @param request client request
+   * @param throttleTimeMs Duration in milliseconds for which the channel is to be muted.
+   * @param channelThrottlingCallback Callback for channel throttling
+   * @return ThrottledChannel object

Review comment:
       Doesn't return anything?

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.Session
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.QuotaViolationException
+import org.apache.kafka.common.metrics.Sensor
+import org.apache.kafka.common.metrics.Sensor.QuotaEnforcementType
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.quota.ClientQuotaCallback
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The ControllerMutationQuota trait defines a quota for a given user/clientId pair. Such
+ * quota is not meant to be cached forever but rather during the lifetime of processing
+ * a request.
+ */
+trait ControllerMutationQuota {

Review comment:
       We should include `Request` in the names of these traits and associated classes since these are per-request.

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.Session
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.QuotaViolationException
+import org.apache.kafka.common.metrics.Sensor
+import org.apache.kafka.common.metrics.Sensor.QuotaEnforcementType
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.quota.ClientQuotaCallback
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The ControllerMutationQuota trait defines a quota for a given user/clientId pair. Such
+ * quota is not meant to be cached forever but rather during the lifetime of processing
+ * a request.
+ */
+trait ControllerMutationQuota {
+  def isExceeded: Boolean
+  def accept(permits: Double): Unit
+  def throttleTime: Int
+}
+
+/**
+ * Default quota used when quota is disabled.
+ */
+object UnboundedControllerMutationQuota extends ControllerMutationQuota {
+  override def isExceeded: Boolean = false
+  override def accept(permits: Double): Unit = ()
+  override def throttleTime: Int = 0
+}
+
+/**
+ * The StrictControllerMutationQuota defines a strict quota for a given user/clientId pair. The
+ * quota is strict meaning that it does not accept any mutations once the quota is exhausted until
+ * it gets back to the defined rate.
+ *
+ * @param time @Time object to use
+ * @param quotaSensor @Sensor object with a defined quota for a given user/clientId pair
+ */
+class StrictControllerMutationQuota(private val time: Time,
+                                    private val quotaSensor: Sensor) extends ControllerMutationQuota {
+
+  private var lastThrottleTimeMs = 0L
+  private var lastRecordedTimeMs = 0L
+
+  override def isExceeded: Boolean = lastThrottleTimeMs > 0
+
+  override def accept(permits: Double): Unit = {
+    val timeMs = time.milliseconds
+    try {
+      quotaSensor.record(permits, timeMs, QuotaEnforcementType.STRICT)
+    } catch {
+      case e: QuotaViolationException =>
+        lastThrottleTimeMs = ClientQuotaManager.throttleTime(e, timeMs)
+        lastRecordedTimeMs = timeMs
+        throw new ThrottlingQuotaExceededException(lastThrottleTimeMs.toInt,
+          Errors.THROTTLING_QUOTA_EXCEEDED.message)
+    }
+  }
+
+  override def throttleTime: Int = {
+    // If a throttle time has been recorded, we adjust it by deducting the time elapsed
+    // between the recording and now. We do this because `throttleTime` may be called
+    // long after having recorded it (e.g. when creating topics).
+    val deltaTimeMs = time.milliseconds - lastRecordedTimeMs
+    Math.max(0, lastThrottleTimeMs - deltaTimeMs).toInt
+  }
+}
+
+/**
+ * The PermissiveControllerMutationQuota defines a permissive quota for a given user/clientId pair.
+ * The quota is permissive meaning that it does accept any mutations even if the quota is exhausted.
+ *
+ * @param time @Time object to use
+ * @param quotaSensor @Sensor object with a defined quota for a given user/clientId pair
+ */
+class PermissiveControllerMutationQuota(private val time: Time,
+                                        private val quotaSensor: Sensor) extends ControllerMutationQuota {

Review comment:
       Looks pretty identical to `StrictControllerMutationQuota` apart from the enforcement type, can we reuse code?

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.Session
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.QuotaViolationException
+import org.apache.kafka.common.metrics.Sensor
+import org.apache.kafka.common.metrics.Sensor.QuotaEnforcementType
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.quota.ClientQuotaCallback
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The ControllerMutationQuota trait defines a quota for a given user/clientId pair. Such
+ * quota is not meant to be cached forever but rather during the lifetime of processing
+ * a request.
+ */
+trait ControllerMutationQuota {
+  def isExceeded: Boolean
+  def accept(permits: Double): Unit
+  def throttleTime: Int
+}
+
+/**
+ * Default quota used when quota is disabled.
+ */
+object UnboundedControllerMutationQuota extends ControllerMutationQuota {
+  override def isExceeded: Boolean = false
+  override def accept(permits: Double): Unit = ()
+  override def throttleTime: Int = 0
+}
+
+/**
+ * The StrictControllerMutationQuota defines a strict quota for a given user/clientId pair. The
+ * quota is strict meaning that it does not accept any mutations once the quota is exhausted until
+ * it gets back to the defined rate.
+ *
+ * @param time @Time object to use
+ * @param quotaSensor @Sensor object with a defined quota for a given user/clientId pair
+ */
+class StrictControllerMutationQuota(private val time: Time,
+                                    private val quotaSensor: Sensor) extends ControllerMutationQuota {
+
+  private var lastThrottleTimeMs = 0L
+  private var lastRecordedTimeMs = 0L

Review comment:
       Are these called `lastXXX` because we can record multiple times and we are interested in the last value?

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.Session
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.QuotaViolationException
+import org.apache.kafka.common.metrics.Sensor
+import org.apache.kafka.common.metrics.Sensor.QuotaEnforcementType
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.quota.ClientQuotaCallback
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The ControllerMutationQuota trait defines a quota for a given user/clientId pair. Such
+ * quota is not meant to be cached forever but rather during the lifetime of processing
+ * a request.
+ */
+trait ControllerMutationQuota {
+  def isExceeded: Boolean
+  def accept(permits: Double): Unit

Review comment:
       nit: maybe use the same `record` terminology we use elsewhere?

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -101,12 +148,6 @@ class AdminManager(val config: KafkaConfig,
         if (nullConfigs.nonEmpty)
           throw new InvalidRequestException(s"Null value not supported for topic configs : ${nullConfigs.mkString(",")}")
 
-        val configs = new Properties()
-        topic.configs.forEach { entry =>
-          configs.setProperty(entry.name, entry.value)
-        }
-        LogConfig.validate(configs)

Review comment:
       Where is this validation done now?

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.Session
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.QuotaViolationException
+import org.apache.kafka.common.metrics.Sensor
+import org.apache.kafka.common.metrics.Sensor.QuotaEnforcementType
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.quota.ClientQuotaCallback
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The ControllerMutationQuota trait defines a quota for a given user/clientId pair. Such
+ * quota is not meant to be cached forever but rather during the lifetime of processing
+ * a request.
+ */
+trait ControllerMutationQuota {
+  def isExceeded: Boolean
+  def accept(permits: Double): Unit
+  def throttleTime: Int
+}
+
+/**
+ * Default quota used when quota is disabled.
+ */
+object UnboundedControllerMutationQuota extends ControllerMutationQuota {
+  override def isExceeded: Boolean = false
+  override def accept(permits: Double): Unit = ()
+  override def throttleTime: Int = 0
+}
+
+/**
+ * The StrictControllerMutationQuota defines a strict quota for a given user/clientId pair. The
+ * quota is strict meaning that it does not accept any mutations once the quota is exhausted until
+ * it gets back to the defined rate.
+ *
+ * @param time @Time object to use
+ * @param quotaSensor @Sensor object with a defined quota for a given user/clientId pair
+ */
+class StrictControllerMutationQuota(private val time: Time,
+                                    private val quotaSensor: Sensor) extends ControllerMutationQuota {
+
+  private var lastThrottleTimeMs = 0L
+  private var lastRecordedTimeMs = 0L
+
+  override def isExceeded: Boolean = lastThrottleTimeMs > 0
+
+  override def accept(permits: Double): Unit = {
+    val timeMs = time.milliseconds
+    try {
+      quotaSensor.record(permits, timeMs, QuotaEnforcementType.STRICT)
+    } catch {
+      case e: QuotaViolationException =>
+        lastThrottleTimeMs = ClientQuotaManager.throttleTime(e, timeMs)
+        lastRecordedTimeMs = timeMs
+        throw new ThrottlingQuotaExceededException(lastThrottleTimeMs.toInt,
+          Errors.THROTTLING_QUOTA_EXCEEDED.message)
+    }
+  }
+
+  override def throttleTime: Int = {
+    // If a throttle time has been recorded, we adjust it by deducting the time elapsed
+    // between the recording and now. We do this because `throttleTime` may be called
+    // long after having recorded it (e.g. when creating topics).
+    val deltaTimeMs = time.milliseconds - lastRecordedTimeMs
+    Math.max(0, lastThrottleTimeMs - deltaTimeMs).toInt

Review comment:
       Not sure how this works. We calculated throtle time before creating a topic. Then spent some time using up CPU to create the topic. Are we saying that we should throttle less because the request used up time?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1777,7 +1787,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           s"client ${request.header.clientId}.")
         responseBody
       }
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, None)

Review comment:
       nit: Make the last argument a default arg or use named argument since it is not clear what None is.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1803,14 +1813,20 @@ class KafkaApis(val requestChannel: RequestChannel,
         unauthorized.map(_.name -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is failed.")) ++
         queuedForDeletion.map(_.name -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))
 
-      adminManager.createPartitions(createPartitionsRequest.data.timeoutMs,
+      adminManager.createPartitions(
+        createPartitionsRequest.data.timeoutMs,
         valid,
         createPartitionsRequest.data.validateOnly,
-        request.context.listenerName, result => sendResponseCallback(result ++ errors))
+        controllerMutationQuota,
+        result => sendResponseCallback(result ++ errors))
     }
   }
 
   def handleDeleteTopicsRequest(request: RequestChannel.Request): Unit = {
+    // Since version 5 of the API, the quota is strictly enforced. Any topic deletion
+    // above the quota is not allowed and rejected with a THROTTLING_QUOTA_EXCEEDED error.
+    val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, 5)

Review comment:
       Named argument for `strictSinceVersion`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1777,7 +1787,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           s"client ${request.header.clientId}.")
         responseBody
       }
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, None)

Review comment:
       nit: Make the last argument a default arg or use named argument since it is not clear what None is.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1746,21 +1751,26 @@ class KafkaApis(val requestChannel: RequestChannel,
             result.setConfigs(List.empty.asJava)
               .setNumPartitions(-1)
               .setReplicationFactor(-1)
-              .setTopicConfigErrorCode(0.toShort)
+              .setTopicConfigErrorCode(Errors.NONE.code)
           }
         }
         sendResponseCallback(results)
       }
-      adminManager.createTopics(createTopicsRequest.data.timeoutMs,
-          createTopicsRequest.data.validateOnly,
-          toCreate,
-          authorizedForDescribeConfigs,
-          handleCreateTopicsResults)
+      adminManager.createTopics(
+        createTopicsRequest.data.timeoutMs,
+        createTopicsRequest.data.validateOnly,
+        toCreate,
+        authorizedForDescribeConfigs,
+        controllerMutationQuota,
+        handleCreateTopicsResults)
     }
   }
 
   def handleCreatePartitionsRequest(request: RequestChannel.Request): Unit = {
     val createPartitionsRequest = request.body[CreatePartitionsRequest]
+    // Since version 3 of the API, the quota is strictly enforced. Any partition creation
+    // above the quota is not allowed and rejected with a THROTTLING_QUOTA_EXCEEDED error.
+    val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, 3)

Review comment:
       As before, use named argument for `strictSinceVersion` and comment not necessary here?

##########
File path: core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
##########
@@ -0,0 +1,361 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package kafka.server
+
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.TimeUnit
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.internals.KafkaFutureImpl
+import org.apache.kafka.common.message.CreatePartitionsRequestData
+import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.message.DeleteTopicsRequestData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.quota.ClientQuotaAlteration
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.requests.AlterClientQuotasRequest
+import org.apache.kafka.common.requests.AlterClientQuotasResponse
+import org.apache.kafka.common.requests.CreatePartitionsRequest
+import org.apache.kafka.common.requests.CreatePartitionsResponse
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.requests.CreateTopicsResponse
+import org.apache.kafka.common.requests.DeleteTopicsRequest
+import org.apache.kafka.common.requests.DeleteTopicsResponse
+import org.apache.kafka.common.security.auth.AuthenticationContext
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Before
+import org.junit.Test
+
+import scala.jdk.CollectionConverters._
+
+object ControllerMutationQuotaTest {
+  // Principal used for all client connections. This is updated by each test.
+  var principal = KafkaPrincipal.ANONYMOUS
+  class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      principal
+    }
+  }
+
+  def asPrincipal(newPrincipal: KafkaPrincipal)(f: => Unit): Unit = {
+    val currentPrincipal = principal
+    principal = newPrincipal
+    try f
+    finally principal = currentPrincipal
+  }
+
+  val ThrottledPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ThrottledPrincipal")
+  val UnboundedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "UnboundedPrincipal")
+
+  val StrictCreateTopicsRequestVersion = ApiKeys.CREATE_TOPICS.latestVersion
+  val PermissiveCreateTopicsRequestVersion = 5.toShort
+
+  val StrictDeleteTopicsRequestVersion = ApiKeys.DELETE_TOPICS.latestVersion
+  val PermissiveDeleteTopicsRequestVersion = 4.toShort
+
+  val StrictCreatePartitionsRequestVersion = ApiKeys.CREATE_PARTITIONS.latestVersion
+  val PermissiveCreatePartitionsRequestVersion = 2.toShort
+
+  val TopicsWithOnePartition = Seq("topic-1" ->  1, "topic-2" ->  1)
+  val TopicsWith30Partitions = Seq("topic-1" -> 30, "topic-2" -> 30)
+  val TopicsWith31Partitions = Seq("topic-1" -> 31, "topic-2" -> 31)
+
+  val ControllerMutationRate = 2.0
+}
+
+class ControllerMutationQuotaTest extends BaseRequestTest {
+  import ControllerMutationQuotaTest._
+
+  override def brokerCount: Int = 1
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+    properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+    properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+    properties.put(KafkaConfig.PrincipalBuilderClassProp,
+      classOf[ControllerMutationQuotaTest.TestPrincipalBuilder].getName)
+    // We use the default number of samples and window size.
+    properties.put(KafkaConfig.NumControllerQuotaSamplesProp, "11")
+    properties.put(KafkaConfig.ControllerQuotaWindowSizeSecondsProp, "1")
+  }
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+
+    // Define a quota for ThrottledPrincipal
+    defineUserQuota(ThrottledPrincipal.getName, Some(ControllerMutationRate))
+    waitUserQuota(ThrottledPrincipal.getName, ControllerMutationRate)
+  }
+
+  @Test
+  def testSetUnsetQuota(): Unit = {
+    val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "User")
+    // Default Value
+    waitUserQuota(principal.getName, Long.MaxValue)
+    // Define a new quota
+    defineUserQuota(principal.getName, Some(ControllerMutationRate))
+    // Check it
+    waitUserQuota(principal.getName, ControllerMutationRate)
+    // Remove it
+    defineUserQuota(principal.getName, None)
+    // Back to the default
+    waitUserQuota(principal.getName, Long.MaxValue)
+  }
+
+  @Test
+  def testStrictCreateTopicsRequest(): Unit = {
+    asPrincipal(ThrottledPrincipal) {
+      // Create two topics worth of 30 partitions each. As we use a strict quota, we
+      // expect the first topic to be created and the second to be rejected.
+      // Theoretically, the throttle time should be bellow or equal to:
+      // ((30 / 10) - 2) / 2 * 10 = 5s
+      val (throttleTimeMs1, errors1) = createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
+      assertTrue((5000 - throttleTimeMs1) < 1000)
+      assertEquals(Seq(Errors.NONE, Errors.THROTTLING_QUOTA_EXCEEDED), errors1)
+
+      // The implementation of the Rate has NOT been changed yet so we have to wait past
+      // the window in order to get the avg rate bellow the quota.
+      Thread.sleep(11000) // Thread.sleep(throttleTimeMs1)

Review comment:
       Can we test using mock time or a different approach that doesn't need these long sleeps?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1684,25 +1685,29 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
+    // Since version 6 of the API, the quota is strictly enforced. Any topic creation
+    // above the quota is not allowed and rejected with a THROTTLING_QUOTA_EXCEEDED error.
+    val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, 6)

Review comment:
       nit: `quotas.controllerMutation.newQuotaFor(request, strictSinceVersion=6)`. Then we can move the comment to the method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org