You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/10 19:42:15 UTC
[kafka] branch trunk updated: MINOR: Move creation of quota
callback to ensure single instance (#4848)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 79c6f7c MINOR: Move creation of quota callback to ensure single instance (#4848)
79c6f7c is described below
commit 79c6f7cd9af462807a9f6b6313fd6c6c7908951f
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Apr 10 20:42:07 2018 +0100
MINOR: Move creation of quota callback to ensure single instance (#4848)
Move creation of quota callback instance out of KafkaConfig constructor to QuotaFactory.instantiate to avoid creating a callback instance for every KafkaConfig since we create temporary KafkaConfigs during dynamic config updates.
Reviewers: Jun Rao <ju...@gmail.com>
---
.../main/scala/kafka/server/DynamicBrokerConfig.scala | 11 ++++++-----
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 1 -
core/src/main/scala/kafka/server/KafkaServer.scala | 1 -
core/src/main/scala/kafka/server/QuotaFactory.scala | 17 ++++++++++++-----
.../integration/kafka/api/CustomQuotaCallbackTest.scala | 4 ++++
.../test/scala/unit/kafka/server/KafkaApisTest.scala | 3 ++-
7 files changed, 25 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 1839768..be0ed6b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -158,7 +158,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
- addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaConfig))
+ addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
}
@@ -710,13 +710,13 @@ object DynamicListenerConfig {
)
}
-class DynamicClientQuotaCallback(brokerId: Int, config: KafkaConfig) extends Reconfigurable {
+class DynamicClientQuotaCallback(brokerId: Int, server: KafkaServer) extends Reconfigurable {
override def configure(configs: util.Map[String, _]): Unit = {}
override def reconfigurableConfigs(): util.Set[String] = {
val configs = new util.HashSet[String]()
- config.quotaCallback.foreach {
+ server.quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable => configs.addAll(callback.reconfigurableConfigs)
case _ =>
}
@@ -724,14 +724,15 @@ class DynamicClientQuotaCallback(brokerId: Int, config: KafkaConfig) extends Rec
}
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
- config.quotaCallback.foreach {
+ server.quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable => callback.validateReconfiguration(configs)
case _ =>
}
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
- config.quotaCallback.foreach {
+ val config = server.config
+ server.quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable =>
config.dynamicConfig.maybeReconfigure(callback, config.dynamicConfig.currentKafkaConfig, configs)
true
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f43f8a5..a0caa4a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -231,7 +231,7 @@ class KafkaApis(val requestChannel: RequestChannel,
adminManager.tryCompleteDelayedTopicOperations(topic)
}
}
- config.quotaCallback.foreach { callback =>
+ quotas.clientQuotaCallback.foreach { callback =>
if (callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId, request.context.listenerName))) {
quotas.fetch.updateQuotaMetricConfigs()
quotas.produce.updateQuotaMetricConfigs()
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7927f1b..4834791 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1224,7 +1224,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp)
val numAlterLogDirsReplicationQuotaSamples = getInt(KafkaConfig.NumAlterLogDirsReplicationQuotaSamplesProp)
val alterLogDirsReplicationQuotaWindowSizeSeconds = getInt(KafkaConfig.AlterLogDirsReplicationQuotaWindowSizeSecondsProp)
- val quotaCallback = Option(getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp, classOf[ClientQuotaCallback]))
/** ********* Transaction Configuration **************/
val transactionIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f621051..7105688 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -595,7 +595,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
- config.quotaCallback.foreach(_.close())
// Even though socket server is stopped much earlier, controller can generate
// response for controlled shutdown request. Shutdown server at the end to
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index c758b5a..1ee713b 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -20,6 +20,7 @@ import kafka.server.QuotaType._
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.server.quota.ClientQuotaCallback
import org.apache.kafka.common.utils.Time
object QuotaType {
@@ -44,22 +45,28 @@ object QuotaFactory extends Logging {
request: ClientRequestQuotaManager,
leader: ReplicationQuotaManager,
follower: ReplicationQuotaManager,
- alterLogDirs: ReplicationQuotaManager) {
+ alterLogDirs: ReplicationQuotaManager,
+ clientQuotaCallback: Option[ClientQuotaCallback]) {
def shutdown() {
fetch.shutdown
produce.shutdown
request.shutdown
+ clientQuotaCallback.foreach(_.close())
}
}
def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time, threadNamePrefix: String): QuotaManagers = {
+
+ val clientQuotaCallback = Option(cfg.getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp,
+ classOf[ClientQuotaCallback]))
QuotaManagers(
- new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix, cfg.quotaCallback),
- new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix, cfg.quotaCallback),
- new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix, cfg.quotaCallback),
+ new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix, clientQuotaCallback),
+ new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix, clientQuotaCallback),
+ new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallback),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time),
- new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time)
+ new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time),
+ clientQuotaCallback
)
}
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 886d696..018971b 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -168,6 +168,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig = false)
user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+ assertEquals(serverCount, callbackInstances.get)
}
/**
@@ -329,6 +331,7 @@ object GroupedUserQuotaCallback {
ClientQuotaType.FETCH -> new AtomicInteger,
ClientQuotaType.REQUEST -> new AtomicInteger
)
+ val callbackInstances = new AtomicInteger
}
/**
@@ -354,6 +357,7 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
override def configure(configs: util.Map[String, _]): Unit = {
brokerId = configs.get(KafkaConfig.BrokerIdProp).toString.toInt
+ callbackInstances.incrementAndGet
}
override def reconfigurableConfigs: util.Set[String] = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5de978c..382364f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -68,7 +68,8 @@ class KafkaApisTest {
private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
private val replicaQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
- private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager)
+ private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager,
+ replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None)
private val fetchManager = EasyMock.createNiceMock(classOf[FetchManager])
private val brokerTopicStats = new BrokerTopicStats
private val clusterId = "clusterId"
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.