You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/02/17 05:35:16 UTC
kafka git commit: KAFKA-2547; Make DynamicConfigManager to use the ZkNodeChangeNotifica…
Repository: kafka
Updated Branches:
refs/heads/trunk 5df56145a -> 2faf9f60c
KAFKA-2547; Make DynamicConfigManager to use the ZkNodeChangeNotifica…
…tionListener introduced as part of KAFKA-2211
Author: Parth Brahmbhatt <br...@gmail.com>
Reviewers: Flavio Junqueira <fp...@apache.org>, Ismael Juma <is...@juma.me.uk>, Sriharsha Chintalapani <ma...@harsha.io>
Closes #679 from Parth-Brahmbhatt/KAFKA-2547 and squashes the following commits:
1722c76 [Parth Brahmbhatt] Addressing review comments.
376f77d [Parth Brahmbhatt] Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-2547
a13b963 [Parth Brahmbhatt] Addressing comments from Reviewers.
1007137 [Parth Brahmbhatt] KAFKA-2547: Make DynamicConfigManager to use the ZkNodeChangeNotificationListener introduced as part of KAFKA-2211
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2faf9f60
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2faf9f60
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2faf9f60
Branch: refs/heads/trunk
Commit: 2faf9f60c81023b4c061c4b8067c8b23cf73516e
Parents: 5df5614
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Feb 16 20:34:59 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Feb 16 20:34:59 2016 -0800
----------------------------------------------------------------------
.../ZkNodeChangeNotificationListener.scala | 19 +++-
.../security/auth/SimpleAclAuthorizer.scala | 18 ----
.../kafka/server/DynamicConfigManager.scala | 95 ++++----------------
.../kafka/server/DynamicConfigChangeTest.scala | 10 +--
4 files changed, 40 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2faf9f60/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 91b4fb9..baddecc 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -19,8 +19,9 @@ package kafka.common
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils.{Time, SystemTime, ZkUtils, Logging}
+import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.exception.ZkInterruptedException
-import org.I0Itec.zkclient.IZkChildListener
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener}
import scala.collection.JavaConverters._
/**
@@ -62,6 +63,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
def init() {
zkUtils.makeSurePersistentPathExists(seqNodeRoot)
zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
+ zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
processAllNotifications()
}
@@ -139,5 +141,20 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
}
}
+ object ZkStateChangeListener extends IZkStateListener {
+
+ override def handleNewSession() {
+ processAllNotifications
+ }
+
+ override def handleSessionEstablishmentError(error: Throwable) {
+ fatal("Could not establish session with zookeeper", error)
+ }
+
+ override def handleStateChanged(state: KeeperState) {
+ debug(s"New zookeeper state: ${state}")
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2faf9f60/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index f716e16..77e23f8 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -107,8 +107,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
aclChangeListener.init()
-
- zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
}
override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
@@ -278,20 +276,4 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
updateCache(resource, acls)
}
}
-
- object ZkStateChangeListener extends IZkStateListener {
-
- override def handleNewSession() {
- aclChangeListener.processAllNotifications
- }
-
- override def handleSessionEstablishmentError(error: Throwable) {
- fatal("Could not establish session with zookeeper", error)
- }
-
- override def handleStateChanged(state: KeeperState) {
- //no op
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2faf9f60/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index cb4b8f1..eb406af 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -17,15 +17,18 @@
package kafka.server
+import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
+import kafka.security.auth.Resource
import kafka.utils.Json
import kafka.utils.Logging
import kafka.utils.SystemTime
import kafka.utils.Time
import kafka.utils.ZkUtils
+import org.apache.zookeeper.Watcher.Event.KeeperState
import scala.collection._
import kafka.admin.AdminUtils
-import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
/**
@@ -76,62 +79,21 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
private val time: Time = SystemTime) extends Logging {
private var lastExecutedChange = -1L
- /**
- * Begin watching for config changes
- */
- def startup() {
- zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
- zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
- processAllConfigChanges()
- }
-
- /**
- * Process all config changes
- */
- private def processAllConfigChanges() {
- val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
- import JavaConversions._
- processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
- }
-
- /**
- * Process the given list of config changes
- */
- private def processConfigChanges(notifications: Seq[String]) {
- if (notifications.size > 0) {
- info("Processing config change notification(s)...")
- val now = time.milliseconds
- for (notification <- notifications) {
- val changeId = changeNumber(notification)
-
- if (changeId > lastExecutedChange) {
- val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
-
- val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
- processNotification(jsonOpt)
- }
- lastExecutedChange = changeId
- }
- purgeObsoleteNotifications(now, notifications)
- }
- }
-
- def processNotification(jsonOpt: Option[String]) = {
- if(jsonOpt.isDefined) {
- val json = jsonOpt.get
+ object ConfigChangedNotificationHandler extends NotificationHandler {
+ override def processNotification(json: String) = {
Json.parseFull(json) match {
case None => // There are no config overrides.
- // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+ // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
case Some(mapAnon: Map[_, _]) =>
val map = mapAnon collect
- { case (k: String, v: Any) => k -> v }
+ { case (k: String, v: Any) => k -> v }
require(map("version") == 1)
val entityType = map.get("entity_type") match {
case Some(ConfigType.Topic) => ConfigType.Topic
case Some(ConfigType.Client) => ConfigType.Client
case _ => throw new IllegalArgumentException("Config change notification must have 'entity_type' set to either 'client' or 'topic'." +
- " Received: " + json)
+ " Received: " + json)
}
val entity = map.get("entity_name") match {
@@ -143,43 +105,20 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
configHandlers(entityType).processConfigChanges(entity, entityConfig)
case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
- "{\"version\" : 1," +
- " \"entity_type\":\"topic/client\"," +
- " \"entity_name\" : \"topic_name/client_id\"}." +
- " Received: " + json)
- }
- }
- }
-
- private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
- for(notification <- notifications.sorted) {
- val (jsonOpt, stat) = zkUtils.readDataMaybeNull(ZkUtils.EntityConfigChangesPath + "/" + notification)
- if(jsonOpt.isDefined) {
- val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
- if (now - stat.getCtime > changeExpirationMs) {
- debug("Purging config change notification " + notification)
- zkUtils.deletePath(changeZnode)
- } else {
- return
- }
+ "{\"version\" : 1," +
+ " \"entity_type\":\"topic/client\"," +
+ " \"entity_name\" : \"topic_name/client_id\"}." +
+ " Received: " + json)
}
}
}
- /* get the change number from a change notification znode */
- private def changeNumber(name: String): Long = name.substring(AdminUtils.EntityConfigChangeZnodePrefix.length).toLong
+ private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
/**
- * A listener that applies config changes to logs
+ * Begin watching for config changes
*/
- object ConfigChangeListener extends IZkChildListener {
- override def handleChildChange(path: String, chillins: java.util.List[String]) {
- try {
- import JavaConversions._
- processConfigChanges(chillins: mutable.Buffer[String])
- } catch {
- case e: Exception => error("Error processing config change:", e)
- }
- }
+ def startup(): Unit = {
+ configChangeListener.init()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2faf9f60/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 8984f17..d1ad3a3 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -109,12 +109,12 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
val configManager = new DynamicConfigManager(zkUtils, Map(ConfigType.Topic -> handler))
// Notifications created using the old TopicConfigManager are ignored.
- configManager.processNotification(Some("not json"))
+ configManager.ConfigChangedNotificationHandler.processNotification("not json")
// Incorrect Map. No version
try {
val jsonMap = Map("v" -> 1, "x" -> 2)
- configManager.processNotification(Some(Json.encode(jsonMap)))
+ configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
@@ -123,7 +123,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
// Version is provided. EntityType is incorrect
try {
val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x")
- configManager.processNotification(Some(Json.encode(jsonMap)))
+ configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
@@ -133,7 +133,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
// EntityName isn't provided
try {
val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic)
- configManager.processNotification(Some(Json.encode(jsonMap)))
+ configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
@@ -142,7 +142,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
// Everything is provided
val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x")
- configManager.processNotification(Some(Json.encode(jsonMap)))
+ configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
// Verify that processConfigChanges was only called once
EasyMock.verify(handler)