You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/09 23:18:28 UTC

[kafka] branch trunk updated: MINOR: Fix deadlock in ZooKeeperClient.close() on session expiry (#4672)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d4d5f8  MINOR: Fix deadlock in ZooKeeperClient.close() on session expiry (#4672)
8d4d5f8 is described below

commit 8d4d5f8c9ffeba45022014bdcf98eddbd3a3afc8
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Mar 9 23:18:23 2018 +0000

    MINOR: Fix deadlock in ZooKeeperClient.close() on session expiry (#4672)
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    | 37 ++++++++++++++--------
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 22 +++++++++++++
 2 files changed, 45 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index efbd6e8..74a3a2d 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -301,13 +301,17 @@ class ZooKeeperClient(connectString: String,
     stateChangeHandlers.remove(name)
   }
 
-  def close(): Unit = inWriteLock(initializationLock) {
+  def close(): Unit = {
     info("Closing.")
-    zNodeChangeHandlers.clear()
-    zNodeChildChangeHandlers.clear()
-    stateChangeHandlers.clear()
-    zooKeeper.close()
-    metricNames.foreach(removeMetric(_))
+    inWriteLock(initializationLock) {
+      zNodeChangeHandlers.clear()
+      zNodeChildChangeHandlers.clear()
+      stateChangeHandlers.clear()
+      zooKeeper.close()
+      metricNames.foreach(removeMetric(_))
+    }
+    // Shutdown scheduler outside of lock to avoid deadlock if scheduler
+    // is waiting for lock to process session expiry
     expiryScheduler.shutdown()
     info("Closed.")
   }
@@ -348,6 +352,18 @@ class ZooKeeperClient(connectString: String,
     initialize()
   }
 
+  // Visibility for testing
+  private[zookeeper] def scheduleSessionExpiryHandler(): Unit = {
+    expiryScheduler.schedule("zk-session-expired", () => {
+      inWriteLock(initializationLock) {
+        info("Session expired.")
+        stateChangeHandlers.values.foreach(_.beforeInitializingSession())
+        initialize()
+        stateChangeHandlers.values.foreach(_.afterInitializingSession())
+      }
+    }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+  }
+
   // package level visibility for testing only
   private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
     override def process(event: WatchedEvent): Unit = {
@@ -363,14 +379,7 @@ class ZooKeeperClient(connectString: String,
             error("Auth failed.")
             stateChangeHandlers.values.foreach(_.onAuthFailure())
           } else if (state == KeeperState.Expired) {
-            expiryScheduler.schedule("zk-session-expired", () => {
-              inWriteLock(initializationLock) {
-                info("Session expired.")
-                stateChangeHandlers.values.foreach(_.beforeInitializingSession())
-                initialize()
-                stateChangeHandlers.values.foreach(_.afterInitializingSession())
-              }
-            }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+            scheduleSessionExpiryHandler()
           }
         case Some(path) =>
           (event.getType: @unchecked) match {
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 2e0651c..77e11ea 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -486,6 +486,28 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted)
   }
 
+  @Test
+  def testSessionExpiryDuringClose(): Unit = {
+    val semaphore = new Semaphore(0)
+    val closeExecutor = Executors.newSingleThreadExecutor
+    try {
+      zooKeeperClient.expiryScheduler.schedule("test", () => semaphore.acquireUninterruptibly(),
+        delay = 0, period = -1, TimeUnit.SECONDS)
+      zooKeeperClient.scheduleSessionExpiryHandler()
+      val closeFuture = closeExecutor.submit(new Runnable {
+        override def run(): Unit = {
+          zooKeeperClient.close()
+        }
+      })
+      assertFalse("Close completed without shutting down expiry scheduler gracefully", closeFuture.isDone)
+      semaphore.release()
+      closeFuture.get(10, TimeUnit.SECONDS)
+      assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted)
+    } finally {
+      closeExecutor.shutdownNow()
+    }
+  }
+
   def isExpectedMetricName(metricName: MetricName, name: String): Boolean =
     metricName.getName == name && metricName.getGroup == "testMetricGroup" && metricName.getType == "testMetricType"
 

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.