You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/05 21:03:24 UTC

kafka git commit: KAFKA-3045; ZkNodeChangeNotificationListener shouldn't log InterruptedException as error

Repository: kafka
Updated Branches:
  refs/heads/trunk 5db147c1d -> c44c898ec


KAFKA-3045; ZkNodeChangeNotificationListener shouldn't log InterruptedException as error

Author: Dong Lin <li...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #731 from lindong28/KAFKA-3045


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c44c898e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c44c898e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c44c898e

Branch: refs/heads/trunk
Commit: c44c898ec835bb16950137f831cbc18c6b8b82f5
Parents: 5db147c
Author: Dong Lin <li...@gmail.com>
Authored: Tue Jan 5 12:03:19 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 5 12:03:19 2016 -0800

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      | 36 ++++++++++++++------
 .../security/auth/SimpleAclAuthorizer.scala     |  7 ++--
 2 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c44c898e/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index a600d5d..91b4fb9 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -16,8 +16,11 @@
  */
 package kafka.common
 
+import java.util.concurrent.atomic.AtomicBoolean
+
 import kafka.utils.{Time, SystemTime, ZkUtils, Logging}
-import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+import org.I0Itec.zkclient.exception.ZkInterruptedException
+import org.I0Itec.zkclient.IZkChildListener
 import scala.collection.JavaConverters._
 
 /**
@@ -37,7 +40,7 @@ trait NotificationHandler {
  * The caller/user of this class should ensure that they use zkClient.subscribeStateChanges and call processAllNotifications
  * method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary to ensure that if zk session
  * is terminated and reestablished any missed notification will be processed immediately.
- * @param zkClient
+ * @param zkUtils
  * @param seqNodeRoot
  * @param seqNodePrefix
  * @param notificationHandler
@@ -51,6 +54,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
                                        private val changeExpirationMs: Long = 15 * 60 * 1000,
                                        private val time: Time = SystemTime) extends Logging {
   private var lastExecutedChange = -1L
+  private val isClosed = new AtomicBoolean(false)
 
   /**
    * create seqNodeRoot and begin watching for any new children nodes.
@@ -61,6 +65,10 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
     processAllNotifications()
   }
 
+  def close() = {
+    isClosed.set(true)
+  }
+
   /**
    * Process all changes
    */
@@ -75,17 +83,23 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
   private def processNotifications(notifications: Seq[String]) {
     if (notifications.nonEmpty) {
       info(s"Processing notification(s) to $seqNodeRoot")
-      val now = time.milliseconds
-      for (notification <- notifications) {
-        val changeId = changeNumber(notification)
-        if (changeId > lastExecutedChange) {
-          val changeZnode = seqNodeRoot + "/" + notification
-          val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
-          data map (notificationHandler.processNotification(_)) getOrElse(logger.warn(s"read null data from $changeZnode when processing notification $notification"))
+      try {
+        val now = time.milliseconds
+        for (notification <- notifications) {
+          val changeId = changeNumber(notification)
+          if (changeId > lastExecutedChange) {
+            val changeZnode = seqNodeRoot + "/" + notification
+            val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
+            data map (notificationHandler.processNotification(_)) getOrElse (logger.warn(s"read null data from $changeZnode when processing notification $notification"))
+          }
+          lastExecutedChange = changeId
         }
-        lastExecutedChange = changeId
+        purgeObsoleteNotifications(now, notifications)
+      } catch {
+        case e: ZkInterruptedException =>
+          if (!isClosed.get)
+            throw e
       }
-      purgeObsoleteNotifications(now, notifications)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c44c898e/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 780bdf3..f716e16 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -26,7 +26,7 @@ import kafka.network.RequestChannel.Session
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.IZkStateListener
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import scala.collection.JavaConverters._
@@ -105,7 +105,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     loadCache()
 
     zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
-    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler)
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
     aclChangeListener.init()
 
     zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
@@ -230,6 +230,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   def close() {
+    if (aclChangeListener != null) aclChangeListener.close()
     if (zkUtils != null) zkUtils.close()
   }
 
@@ -269,7 +270,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
   }
 
-  object AclChangedNotificaitonHandler extends NotificationHandler {
+  object AclChangedNotificationHandler extends NotificationHandler {
 
     override def processNotification(notificationMessage: String) {
       val resource: Resource = Resource.fromString(notificationMessage)