You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/02/17 05:35:16 UTC

kafka git commit: KAFKA-2547; Make DynamicConfigManager to use the ZkNodeChangeNotifica…

Repository: kafka
Updated Branches:
  refs/heads/trunk 5df56145a -> 2faf9f60c


KAFKA-2547; Make DynamicConfigManager to use the ZkNodeChangeNotifica…

…tionListener introduced as part of KAFKA-2211

Author: Parth Brahmbhatt <br...@gmail.com>

Reviewers: Flavio Junqueira <fp...@apache.org>, Ismael Juma <is...@juma.me.uk>, Sriharsha Chintalapani <ma...@harsha.io>

Closes #679 from Parth-Brahmbhatt/KAFKA-2547 and squashes the following commits:

1722c76 [Parth Brahmbhatt] Addressing review comments.
376f77d [Parth Brahmbhatt] Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-2547
a13b963 [Parth Brahmbhatt] Addressing comments from Reviewers.
1007137 [Parth Brahmbhatt] KAFKA-2547: Make DynamicConfigManager to use the ZkNodeChangeNotificationListener introduced as part of KAFKA-2211


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2faf9f60
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2faf9f60
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2faf9f60

Branch: refs/heads/trunk
Commit: 2faf9f60c81023b4c061c4b8067c8b23cf73516e
Parents: 5df5614
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Feb 16 20:34:59 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Feb 16 20:34:59 2016 -0800

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      | 19 +++-
 .../security/auth/SimpleAclAuthorizer.scala     | 18 ----
 .../kafka/server/DynamicConfigManager.scala     | 95 ++++----------------
 .../kafka/server/DynamicConfigChangeTest.scala  | 10 +--
 4 files changed, 40 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2faf9f60/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 91b4fb9..baddecc 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -19,8 +19,9 @@ package kafka.common
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.utils.{Time, SystemTime, ZkUtils, Logging}
+import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.exception.ZkInterruptedException
-import org.I0Itec.zkclient.IZkChildListener
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener}
 import scala.collection.JavaConverters._
 
 /**
@@ -62,6 +63,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
   def init() {
     zkUtils.makeSurePersistentPathExists(seqNodeRoot)
     zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
+    zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
     processAllNotifications()
   }
 
@@ -139,5 +141,20 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
     }
   }
 
+  object ZkStateChangeListener extends IZkStateListener {
+
+    override def handleNewSession() {
+      processAllNotifications
+    }
+
+    override def handleSessionEstablishmentError(error: Throwable) {
+      fatal("Could not establish session with zookeeper", error)
+    }
+
+    override def handleStateChanged(state: KeeperState) {
+      debug(s"New zookeeper state: ${state}")
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2faf9f60/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index f716e16..77e23f8 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -107,8 +107,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
     aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
     aclChangeListener.init()
-
-    zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
@@ -278,20 +276,4 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       updateCache(resource, acls)
     }
   }
-
-  object ZkStateChangeListener extends IZkStateListener {
-
-    override def handleNewSession() {
-      aclChangeListener.processAllNotifications
-    }
-
-    override def handleSessionEstablishmentError(error: Throwable) {
-      fatal("Could not establish session with zookeeper", error)
-    }
-
-    override def handleStateChanged(state: KeeperState) {
-      //no op
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2faf9f60/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index cb4b8f1..eb406af 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -17,15 +17,18 @@
 
 package kafka.server
 
+import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
+import kafka.security.auth.Resource
 import kafka.utils.Json
 import kafka.utils.Logging
 import kafka.utils.SystemTime
 import kafka.utils.Time
 import kafka.utils.ZkUtils
+import org.apache.zookeeper.Watcher.Event.KeeperState
 
 import scala.collection._
 import kafka.admin.AdminUtils
-import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 
 
 /**
@@ -76,62 +79,21 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
                            private val time: Time = SystemTime) extends Logging {
   private var lastExecutedChange = -1L
 
-  /**
-   * Begin watching for config changes
-   */
-  def startup() {
-    zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
-    zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
-    processAllConfigChanges()
-  }
-
-  /**
-   * Process all config changes
-   */
-  private def processAllConfigChanges() {
-    val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
-    import JavaConversions._
-    processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
-  }
-
-  /**
-   * Process the given list of config changes
-   */
-  private def processConfigChanges(notifications: Seq[String]) {
-    if (notifications.size > 0) {
-      info("Processing config change notification(s)...")
-      val now = time.milliseconds
-      for (notification <- notifications) {
-        val changeId = changeNumber(notification)
-
-        if (changeId > lastExecutedChange) {
-          val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
-
-          val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
-          processNotification(jsonOpt)
-        }
-        lastExecutedChange = changeId
-      }
-      purgeObsoleteNotifications(now, notifications)
-    }
-  }
-
-  def processNotification(jsonOpt: Option[String]) = {
-    if(jsonOpt.isDefined) {
-      val json = jsonOpt.get
+  object ConfigChangedNotificationHandler extends NotificationHandler {
+    override def processNotification(json: String) = {
       Json.parseFull(json) match {
         case None => // There are no config overrides.
-          // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+        // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
         case Some(mapAnon: Map[_, _]) =>
           val map = mapAnon collect
-                  { case (k: String, v: Any) => k -> v }
+            { case (k: String, v: Any) => k -> v }
           require(map("version") == 1)
 
           val entityType = map.get("entity_type") match {
             case Some(ConfigType.Topic) => ConfigType.Topic
             case Some(ConfigType.Client) => ConfigType.Client
             case _ => throw new IllegalArgumentException("Config change notification must have 'entity_type' set to either 'client' or 'topic'." +
-                    " Received: " + json)
+              " Received: " + json)
           }
 
           val entity = map.get("entity_name") match {
@@ -143,43 +105,20 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
           configHandlers(entityType).processConfigChanges(entity, entityConfig)
 
         case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
-                                                             "{\"version\" : 1," +
-                                                             " \"entity_type\":\"topic/client\"," +
-                                                             " \"entity_name\" : \"topic_name/client_id\"}." +
-                                                             " Received: " + json)
-      }
-    }
-  }
-
-  private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
-    for(notification <- notifications.sorted) {
-      val (jsonOpt, stat) = zkUtils.readDataMaybeNull(ZkUtils.EntityConfigChangesPath + "/" + notification)
-      if(jsonOpt.isDefined) {
-        val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
-        if (now - stat.getCtime > changeExpirationMs) {
-          debug("Purging config change notification " + notification)
-          zkUtils.deletePath(changeZnode)
-        } else {
-          return
-        }
+          "{\"version\" : 1," +
+          " \"entity_type\":\"topic/client\"," +
+          " \"entity_name\" : \"topic_name/client_id\"}." +
+          " Received: " + json)
       }
     }
   }
 
-  /* get the change number from a change notification znode */
-  private def changeNumber(name: String): Long = name.substring(AdminUtils.EntityConfigChangeZnodePrefix.length).toLong
+  private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
 
   /**
-   * A listener that applies config changes to logs
+   * Begin watching for config changes
    */
-  object ConfigChangeListener extends IZkChildListener {
-    override def handleChildChange(path: String, chillins: java.util.List[String]) {
-      try {
-        import JavaConversions._
-        processConfigChanges(chillins: mutable.Buffer[String])
-      } catch {
-        case e: Exception => error("Error processing config change:", e)
-      }
-    }
+  def startup(): Unit = {
+    configChangeListener.init()
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2faf9f60/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 8984f17..d1ad3a3 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -109,12 +109,12 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
 
     val configManager = new DynamicConfigManager(zkUtils, Map(ConfigType.Topic -> handler))
     // Notifications created using the old TopicConfigManager are ignored.
-    configManager.processNotification(Some("not json"))
+    configManager.ConfigChangedNotificationHandler.processNotification("not json")
 
     // Incorrect Map. No version
     try {
       val jsonMap = Map("v" -> 1, "x" -> 2)
-      configManager.processNotification(Some(Json.encode(jsonMap)))
+      configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
@@ -123,7 +123,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // Version is provided. EntityType is incorrect
     try {
       val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x")
-      configManager.processNotification(Some(Json.encode(jsonMap)))
+      configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
@@ -133,7 +133,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // EntityName isn't provided
     try {
       val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic)
-      configManager.processNotification(Some(Json.encode(jsonMap)))
+      configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
@@ -142,7 +142,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
 
     // Everything is provided
     val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x")
-    configManager.processNotification(Some(Json.encode(jsonMap)))
+    configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
 
     // Verify that processConfigChanges was only called once
     EasyMock.verify(handler)