You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/10 19:42:15 UTC

[kafka] branch trunk updated: MINOR: Move creation of quota callback to ensure single instance (#4848)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 79c6f7c  MINOR: Move creation of quota callback to ensure single instance (#4848)
79c6f7c is described below

commit 79c6f7cd9af462807a9f6b6313fd6c6c7908951f
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Apr 10 20:42:07 2018 +0100

    MINOR: Move creation of quota callback to ensure single instance (#4848)
    
    Move creation of quota callback instance out of KafkaConfig constructor to QuotaFactory.instantiate to avoid creating a callback instance for every KafkaConfig since we create temporary KafkaConfigs during dynamic config updates.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../main/scala/kafka/server/DynamicBrokerConfig.scala   | 11 ++++++-----
 core/src/main/scala/kafka/server/KafkaApis.scala        |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala      |  1 -
 core/src/main/scala/kafka/server/KafkaServer.scala      |  1 -
 core/src/main/scala/kafka/server/QuotaFactory.scala     | 17 ++++++++++++-----
 .../integration/kafka/api/CustomQuotaCallbackTest.scala |  4 ++++
 .../test/scala/unit/kafka/server/KafkaApisTest.scala    |  3 ++-
 7 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 1839768..be0ed6b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -158,7 +158,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
     addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
     addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
-    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaConfig))
+    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
   }
 
@@ -710,13 +710,13 @@ object DynamicListenerConfig {
   )
 }
 
-class DynamicClientQuotaCallback(brokerId: Int, config: KafkaConfig) extends Reconfigurable {
+class DynamicClientQuotaCallback(brokerId: Int, server: KafkaServer) extends Reconfigurable {
 
   override def configure(configs: util.Map[String, _]): Unit = {}
 
   override def reconfigurableConfigs(): util.Set[String] = {
     val configs = new util.HashSet[String]()
-    config.quotaCallback.foreach {
+    server.quotaManagers.clientQuotaCallback.foreach {
       case callback: Reconfigurable => configs.addAll(callback.reconfigurableConfigs)
       case _ =>
     }
@@ -724,14 +724,15 @@ class DynamicClientQuotaCallback(brokerId: Int, config: KafkaConfig) extends Rec
   }
 
   override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
-    config.quotaCallback.foreach {
+    server.quotaManagers.clientQuotaCallback.foreach {
       case callback: Reconfigurable => callback.validateReconfiguration(configs)
       case _ =>
     }
   }
 
   override def reconfigure(configs: util.Map[String, _]): Unit = {
-    config.quotaCallback.foreach {
+    val config = server.config
+    server.quotaManagers.clientQuotaCallback.foreach {
       case callback: Reconfigurable =>
         config.dynamicConfig.maybeReconfigure(callback, config.dynamicConfig.currentKafkaConfig, configs)
         true
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f43f8a5..a0caa4a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -231,7 +231,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           adminManager.tryCompleteDelayedTopicOperations(topic)
         }
       }
-      config.quotaCallback.foreach { callback =>
+      quotas.clientQuotaCallback.foreach { callback =>
         if (callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId, request.context.listenerName))) {
           quotas.fetch.updateQuotaMetricConfigs()
           quotas.produce.updateQuotaMetricConfigs()
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7927f1b..4834791 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1224,7 +1224,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp)
   val numAlterLogDirsReplicationQuotaSamples = getInt(KafkaConfig.NumAlterLogDirsReplicationQuotaSamplesProp)
   val alterLogDirsReplicationQuotaWindowSizeSeconds = getInt(KafkaConfig.AlterLogDirsReplicationQuotaWindowSizeSecondsProp)
-  val quotaCallback = Option(getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp, classOf[ClientQuotaCallback]))
 
   /** ********* Transaction Configuration **************/
   val transactionIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f621051..7105688 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -595,7 +595,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         if (quotaManagers != null)
           CoreUtils.swallow(quotaManagers.shutdown(), this)
-        config.quotaCallback.foreach(_.close())
 
         // Even though socket server is stopped much earlier, controller can generate
         // response for controlled shutdown request. Shutdown server at the end to
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index c758b5a..1ee713b 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -20,6 +20,7 @@ import kafka.server.QuotaType._
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.server.quota.ClientQuotaCallback
 import org.apache.kafka.common.utils.Time
 
 object QuotaType  {
@@ -44,22 +45,28 @@ object QuotaFactory extends Logging {
                            request: ClientRequestQuotaManager,
                            leader: ReplicationQuotaManager,
                            follower: ReplicationQuotaManager,
-                           alterLogDirs: ReplicationQuotaManager) {
+                           alterLogDirs: ReplicationQuotaManager,
+                           clientQuotaCallback: Option[ClientQuotaCallback]) {
     def shutdown() {
       fetch.shutdown
       produce.shutdown
       request.shutdown
+      clientQuotaCallback.foreach(_.close())
     }
   }
 
   def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time, threadNamePrefix: String): QuotaManagers = {
+
+    val clientQuotaCallback = Option(cfg.getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp,
+      classOf[ClientQuotaCallback]))
     QuotaManagers(
-      new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix, cfg.quotaCallback),
-      new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix, cfg.quotaCallback),
-      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix, cfg.quotaCallback),
+      new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix, clientQuotaCallback),
+      new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix, clientQuotaCallback),
+      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallback),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time),
-      new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time)
+      new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time),
+      clientQuotaCallback
     )
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 886d696..018971b 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -168,6 +168,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
     TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig = false)
     user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
     user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+    assertEquals(serverCount, callbackInstances.get)
   }
 
   /**
@@ -329,6 +331,7 @@ object GroupedUserQuotaCallback {
     ClientQuotaType.FETCH -> new AtomicInteger,
     ClientQuotaType.REQUEST -> new AtomicInteger
   )
+  val callbackInstances = new AtomicInteger
 }
 
 /**
@@ -354,6 +357,7 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
 
   override def configure(configs: util.Map[String, _]): Unit = {
     brokerId = configs.get(KafkaConfig.BrokerIdProp).toString.toInt
+    callbackInstances.incrementAndGet
   }
 
   override def reconfigurableConfigs: util.Set[String] = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5de978c..382364f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -68,7 +68,8 @@ class KafkaApisTest {
   private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
   private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
   private val replicaQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
-  private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager)
+  private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager,
+    replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None)
   private val fetchManager = EasyMock.createNiceMock(classOf[FetchManager])
   private val brokerTopicStats = new BrokerTopicStats
   private val clusterId = "clusterId"

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.