You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/10 18:54:18 UTC

[GitHub] [kafka] mumrah opened a new pull request #10101: Upstream ClientQuotaMetadataManager

mumrah opened a new pull request #10101:
URL: https://github.com/apache/kafka/pull/10101


   This PR brings in the new broker metadata processor for handling QuotaRecord-s coming from the metadata log. Also included is a new cache class to allow for fast lookups of quotas on the broker for handling DescribeClientQuotaRequest. 
   
   See the original PR here https://github.com/confluentinc/kafka/pull/477


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10101: Upstream ClientQuotaMetadataManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10101:
URL: https://github.com/apache/kafka/pull/10101#discussion_r574003841



##########
File path: core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.network.ConnectionQuotas
+import kafka.server.ConfigEntityName
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.common.metadata.QuotaRecord
+import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.utils.Sanitizer
+
+import java.net.{InetAddress, UnknownHostException}
+import scala.collection.mutable
+
+
+// A strict hierarchy of entities that we support
+sealed trait QuotaEntity
+case class IpEntity(ip: String) extends QuotaEntity
+case object DefaultIpEntity extends QuotaEntity
+case class UserEntity(user: String) extends QuotaEntity
+case object DefaultUserEntity extends QuotaEntity
+case class ClientIdEntity(clientId: String) extends QuotaEntity
+case object DefaultClientIdEntity extends QuotaEntity
+case class ExplicitUserExplicitClientIdEntity(user: String, clientId: String) extends QuotaEntity
+case class ExplicitUserDefaultClientIdEntity(user: String) extends QuotaEntity
+case class DefaultUserExplicitClientIdEntity(clientId: String) extends QuotaEntity
+case object DefaultUserDefaultClientIdEntity extends QuotaEntity
+
+/**
+ * Watch for changes to quotas in the metadata log and update quota managers and cache as necessary
+ */
+class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManagers,
+                                 private[metadata] val connectionQuotas: ConnectionQuotas,
+                                 private[metadata] val quotaCache: ClientQuotaCache) extends Logging {
+
+  def handleQuotaRecord(quotaRecord: QuotaRecord): Unit = {
+    val entityMap = mutable.Map[String, String]()
+    quotaRecord.entity().forEach { entityData =>
+      entityMap.put(entityData.entityType(), entityData.entityName())
+    }
+
+    if (entityMap.contains(ClientQuotaEntity.IP)) {
+      // In the IP quota manager, None is used for default entity
+      val ipEntity = Option(entityMap(ClientQuotaEntity.IP)) match {
+        case Some(ip) => IpEntity(ip)
+        case None => DefaultIpEntity
+      }
+      handleIpQuota(ipEntity, quotaRecord)
+    } else if (entityMap.contains(ClientQuotaEntity.USER) || entityMap.contains(ClientQuotaEntity.CLIENT_ID)) {
+      // Need to handle null values for default entity name, so use "getOrElse" combined with "contains" checks
+      val userVal = entityMap.getOrElse(ClientQuotaEntity.USER, null)
+      val clientIdVal = entityMap.getOrElse(ClientQuotaEntity.CLIENT_ID, null)
+
+      // In User+Client quota managers, "<default>" is used for default entity, so we need to represent all possible
+      // combinations of values, defaults, and absent entities
+      val userClientEntity = if (entityMap.contains(ClientQuotaEntity.USER) && entityMap.contains(ClientQuotaEntity.CLIENT_ID)) {
+        if (userVal == null && clientIdVal == null) {
+          DefaultUserDefaultClientIdEntity
+        } else if (userVal == null) {
+          DefaultUserExplicitClientIdEntity(clientIdVal)
+        } else if (clientIdVal == null) {
+          ExplicitUserDefaultClientIdEntity(userVal)
+        } else {
+          ExplicitUserExplicitClientIdEntity(userVal, clientIdVal)
+        }
+      } else if (entityMap.contains(ClientQuotaEntity.USER)) {
+        if (userVal == null) {
+          DefaultUserEntity
+        } else {
+          UserEntity(userVal)
+        }
+      } else {
+        if (clientIdVal == null) {
+          DefaultClientIdEntity
+        } else {
+          ClientIdEntity(clientIdVal)
+        }
+      }
+      handleUserClientQuota(
+        userClientEntity,
+        quotaRecord
+      )
+    } else {
+      warn(s"Ignoring unsupported quota entity ${quotaRecord.entity()}")
+    }
+  }
+
+  def handleIpQuota(ipEntity: QuotaEntity, quotaRecord: QuotaRecord): Unit = {
+    val inetAddress = ipEntity match {
+      case IpEntity(ip) =>
+        try {
+          Some(InetAddress.getByName(ip))
+        } catch {
+          case _: UnknownHostException => throw new IllegalArgumentException(s"Unable to resolve address $ip")
+        }
+      case DefaultIpEntity => None
+      case _ => throw new IllegalStateException("Should only handle IP quota entities here")
+    }
+
+    // The connection quota only understands the connection rate limit
+    if (quotaRecord.key() != QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG) {
+      warn(s"Ignoring unexpected quota key ${quotaRecord.key()} for entity $ipEntity")
+      return
+    }
+
+    // Update the cache
+    quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, quotaRecord.remove)
+
+    // Convert the value to an appropriate Option for the quota manager
+    val newValue = if (quotaRecord.remove()) {
+      None
+    } else {
+      Some(quotaRecord.value).map(_.toInt)
+    }
+    connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
+  }
+
+  def handleUserClientQuota(quotaEntity: QuotaEntity, quotaRecord: QuotaRecord): Unit = {
+    val manager = quotaRecord.key() match {
+      case QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG => quotaManagers.fetch
+      case QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG => quotaManagers.produce
+      case QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG => quotaManagers.request
+      case QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG => quotaManagers.controllerMutation
+      case _ =>
+        warn(s"Ignoring unexpected quota key ${quotaRecord.key()} for entity $quotaEntity")
+        return
+    }
+
+    quotaCache.updateQuotaCache(quotaEntity, quotaRecord.key, quotaRecord.value, quotaRecord.remove)

Review comment:
       does it make sense to do this update at the end, in case there's an exception in between?  or is that too paranoid




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10101: Upstream ClientQuotaMetadataManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10101:
URL: https://github.com/apache/kafka/pull/10101#discussion_r574001340



##########
File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
##########
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
+import org.apache.kafka.common.quota.{ClientQuotaEntity, ClientQuotaFilterComponent}
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+
+// A type for the cache index keys
+sealed trait CacheIndexKey
+case object DefaultUser extends CacheIndexKey
+case class SpecificUser(user: String) extends CacheIndexKey
+case object DefaultClientId extends CacheIndexKey
+case class SpecificClientId(clientId: String) extends CacheIndexKey
+case object DefaultIp extends CacheIndexKey
+case class SpecificIp(ip: String) extends CacheIndexKey
+
+
+// Different types of matching constraints
+sealed trait QuotaMatch
+case class ExactMatch(entityName: String) extends QuotaMatch
+case object DefaultMatch extends QuotaMatch
+case object TypeMatch extends QuotaMatch
+
+
+class ClientQuotaCache {
+  private type QuotaCacheIndex = mutable.HashMap[CacheIndexKey, mutable.HashSet[QuotaEntity]]
+
+  // A mapping of the quota entities to their quotas, for example:
+  // {
+  //   (user:alice) -> {consumer_byte_rate: 10000},
+  //   (user:alice,client:x) -> {consumer_byte_rate: 8000, producer_byte_rate: 8000}
+  // }

Review comment:
       can we do this as JavaDoc (and maybe use a unicode arrow, while you're at it, to avoid the issues JavaDoc always has with brackets)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10101: Add ClientQuotaMetadataManager for processing QuotaRecord

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10101:
URL: https://github.com/apache/kafka/pull/10101


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10101: Upstream ClientQuotaMetadataManager

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10101:
URL: https://github.com/apache/kafka/pull/10101#discussion_r574006200



##########
File path: core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.network.ConnectionQuotas
+import kafka.server.ConfigEntityName
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.common.metadata.QuotaRecord
+import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.utils.Sanitizer
+
+import java.net.{InetAddress, UnknownHostException}
+import scala.collection.mutable
+
+
+// A strict hierarchy of entities that we support
+sealed trait QuotaEntity
+case class IpEntity(ip: String) extends QuotaEntity
+case object DefaultIpEntity extends QuotaEntity
+case class UserEntity(user: String) extends QuotaEntity
+case object DefaultUserEntity extends QuotaEntity
+case class ClientIdEntity(clientId: String) extends QuotaEntity
+case object DefaultClientIdEntity extends QuotaEntity
+case class ExplicitUserExplicitClientIdEntity(user: String, clientId: String) extends QuotaEntity
+case class ExplicitUserDefaultClientIdEntity(user: String) extends QuotaEntity
+case class DefaultUserExplicitClientIdEntity(clientId: String) extends QuotaEntity
+case object DefaultUserDefaultClientIdEntity extends QuotaEntity
+
+/**
+ * Watch for changes to quotas in the metadata log and update quota managers and cache as necessary
+ */
+class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManagers,
+                                 private[metadata] val connectionQuotas: ConnectionQuotas,
+                                 private[metadata] val quotaCache: ClientQuotaCache) extends Logging {
+
+  def handleQuotaRecord(quotaRecord: QuotaRecord): Unit = {
+    val entityMap = mutable.Map[String, String]()
+    quotaRecord.entity().forEach { entityData =>
+      entityMap.put(entityData.entityType(), entityData.entityName())
+    }
+
+    if (entityMap.contains(ClientQuotaEntity.IP)) {
+      // In the IP quota manager, None is used for default entity
+      val ipEntity = Option(entityMap(ClientQuotaEntity.IP)) match {
+        case Some(ip) => IpEntity(ip)
+        case None => DefaultIpEntity
+      }
+      handleIpQuota(ipEntity, quotaRecord)
+    } else if (entityMap.contains(ClientQuotaEntity.USER) || entityMap.contains(ClientQuotaEntity.CLIENT_ID)) {
+      // Need to handle null values for default entity name, so use "getOrElse" combined with "contains" checks
+      val userVal = entityMap.getOrElse(ClientQuotaEntity.USER, null)
+      val clientIdVal = entityMap.getOrElse(ClientQuotaEntity.CLIENT_ID, null)
+
+      // In User+Client quota managers, "<default>" is used for default entity, so we need to represent all possible
+      // combinations of values, defaults, and absent entities
+      val userClientEntity = if (entityMap.contains(ClientQuotaEntity.USER) && entityMap.contains(ClientQuotaEntity.CLIENT_ID)) {
+        if (userVal == null && clientIdVal == null) {
+          DefaultUserDefaultClientIdEntity
+        } else if (userVal == null) {
+          DefaultUserExplicitClientIdEntity(clientIdVal)
+        } else if (clientIdVal == null) {
+          ExplicitUserDefaultClientIdEntity(userVal)
+        } else {
+          ExplicitUserExplicitClientIdEntity(userVal, clientIdVal)
+        }
+      } else if (entityMap.contains(ClientQuotaEntity.USER)) {
+        if (userVal == null) {
+          DefaultUserEntity
+        } else {
+          UserEntity(userVal)
+        }
+      } else {
+        if (clientIdVal == null) {
+          DefaultClientIdEntity
+        } else {
+          ClientIdEntity(clientIdVal)
+        }
+      }
+      handleUserClientQuota(
+        userClientEntity,
+        quotaRecord
+      )
+    } else {
+      warn(s"Ignoring unsupported quota entity ${quotaRecord.entity()}")
+    }
+  }
+
+  def handleIpQuota(ipEntity: QuotaEntity, quotaRecord: QuotaRecord): Unit = {
+    val inetAddress = ipEntity match {
+      case IpEntity(ip) =>
+        try {
+          Some(InetAddress.getByName(ip))
+        } catch {
+          case _: UnknownHostException => throw new IllegalArgumentException(s"Unable to resolve address $ip")
+        }
+      case DefaultIpEntity => None
+      case _ => throw new IllegalStateException("Should only handle IP quota entities here")
+    }
+
+    // The connection quota only understands the connection rate limit
+    if (quotaRecord.key() != QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG) {
+      warn(s"Ignoring unexpected quota key ${quotaRecord.key()} for entity $ipEntity")
+      return
+    }
+
+    // Update the cache
+    quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, quotaRecord.remove)
+
+    // Convert the value to an appropriate Option for the quota manager
+    val newValue = if (quotaRecord.remove()) {
+      None
+    } else {
+      Some(quotaRecord.value).map(_.toInt)
+    }
+    connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
+  }
+
+  def handleUserClientQuota(quotaEntity: QuotaEntity, quotaRecord: QuotaRecord): Unit = {
+    val manager = quotaRecord.key() match {
+      case QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG => quotaManagers.fetch
+      case QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG => quotaManagers.produce
+      case QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG => quotaManagers.request
+      case QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG => quotaManagers.controllerMutation
+      case _ =>
+        warn(s"Ignoring unexpected quota key ${quotaRecord.key()} for entity $quotaEntity")
+        return
+    }
+
+    quotaCache.updateQuotaCache(quotaEntity, quotaRecord.key, quotaRecord.value, quotaRecord.remove)

Review comment:
       Actually that seems pretty reasonable. The quota managers could theoretically fail to accept and apply a quota for whatever reason. And in that case we probably don't want to cache the quota




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org