You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/05 21:03:24 UTC
kafka git commit: KAFKA-3045;
ZkNodeChangeNotificationListener shouldn't log InterruptedException
as error
Repository: kafka
Updated Branches:
refs/heads/trunk 5db147c1d -> c44c898ec
KAFKA-3045; ZkNodeChangeNotificationListener shouldn't log InterruptedException as error
Author: Dong Lin <li...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #731 from lindong28/KAFKA-3045
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c44c898e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c44c898e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c44c898e
Branch: refs/heads/trunk
Commit: c44c898ec835bb16950137f831cbc18c6b8b82f5
Parents: 5db147c
Author: Dong Lin <li...@gmail.com>
Authored: Tue Jan 5 12:03:19 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 5 12:03:19 2016 -0800
----------------------------------------------------------------------
.../ZkNodeChangeNotificationListener.scala | 36 ++++++++++++++------
.../security/auth/SimpleAclAuthorizer.scala | 7 ++--
2 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c44c898e/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index a600d5d..91b4fb9 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -16,8 +16,11 @@
*/
package kafka.common
+import java.util.concurrent.atomic.AtomicBoolean
+
import kafka.utils.{Time, SystemTime, ZkUtils, Logging}
-import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+import org.I0Itec.zkclient.exception.ZkInterruptedException
+import org.I0Itec.zkclient.IZkChildListener
import scala.collection.JavaConverters._
/**
@@ -37,7 +40,7 @@ trait NotificationHandler {
* The caller/user of this class should ensure that they use zkClient.subscribeStateChanges and call processAllNotifications
* method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary to ensure that if zk session
* is terminated and reestablished any missed notification will be processed immediately.
- * @param zkClient
+ * @param zkUtils
* @param seqNodeRoot
* @param seqNodePrefix
* @param notificationHandler
@@ -51,6 +54,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
private val changeExpirationMs: Long = 15 * 60 * 1000,
private val time: Time = SystemTime) extends Logging {
private var lastExecutedChange = -1L
+ private val isClosed = new AtomicBoolean(false)
/**
* create seqNodeRoot and begin watching for any new children nodes.
@@ -61,6 +65,10 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
processAllNotifications()
}
+ def close() = {
+ isClosed.set(true)
+ }
+
/**
* Process all changes
*/
@@ -75,17 +83,23 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
private def processNotifications(notifications: Seq[String]) {
if (notifications.nonEmpty) {
info(s"Processing notification(s) to $seqNodeRoot")
- val now = time.milliseconds
- for (notification <- notifications) {
- val changeId = changeNumber(notification)
- if (changeId > lastExecutedChange) {
- val changeZnode = seqNodeRoot + "/" + notification
- val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
- data map (notificationHandler.processNotification(_)) getOrElse(logger.warn(s"read null data from $changeZnode when processing notification $notification"))
+ try {
+ val now = time.milliseconds
+ for (notification <- notifications) {
+ val changeId = changeNumber(notification)
+ if (changeId > lastExecutedChange) {
+ val changeZnode = seqNodeRoot + "/" + notification
+ val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
+ data map (notificationHandler.processNotification(_)) getOrElse (logger.warn(s"read null data from $changeZnode when processing notification $notification"))
+ }
+ lastExecutedChange = changeId
}
- lastExecutedChange = changeId
+ purgeObsoleteNotifications(now, notifications)
+ } catch {
+ case e: ZkInterruptedException =>
+ if (!isClosed.get)
+ throw e
}
- purgeObsoleteNotifications(now, notifications)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c44c898e/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 780bdf3..f716e16 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -26,7 +26,7 @@ import kafka.network.RequestChannel.Session
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.IZkStateListener
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import scala.collection.JavaConverters._
@@ -105,7 +105,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
loadCache()
zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
- aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler)
+ aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
aclChangeListener.init()
zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
@@ -230,6 +230,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}
def close() {
+ if (aclChangeListener != null) aclChangeListener.close()
if (zkUtils != null) zkUtils.close()
}
@@ -269,7 +270,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
}
- object AclChangedNotificaitonHandler extends NotificationHandler {
+ object AclChangedNotificationHandler extends NotificationHandler {
override def processNotification(notificationMessage: String) {
val resource: Resource = Resource.fromString(notificationMessage)