You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/09 23:19:52 UTC
[kafka] branch 1.1 updated: MINOR: Fix deadlock in
ZooKeeperClient.close() on session expiry (#4672)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 5778059 MINOR: Fix deadlock in ZooKeeperClient.close() on session expiry (#4672)
5778059 is described below
commit 5778059a91ff3144c228eb4c12f725c89e1e87a6
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Mar 9 23:18:23 2018 +0000
MINOR: Fix deadlock in ZooKeeperClient.close() on session expiry (#4672)
Reviewers: Jun Rao <ju...@gmail.com>
---
.../scala/kafka/zookeeper/ZooKeeperClient.scala | 37 ++++++++++++++--------
.../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 22 +++++++++++++
2 files changed, 45 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index efbd6e8..74a3a2d 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -301,13 +301,17 @@ class ZooKeeperClient(connectString: String,
stateChangeHandlers.remove(name)
}
- def close(): Unit = inWriteLock(initializationLock) {
+ def close(): Unit = {
info("Closing.")
- zNodeChangeHandlers.clear()
- zNodeChildChangeHandlers.clear()
- stateChangeHandlers.clear()
- zooKeeper.close()
- metricNames.foreach(removeMetric(_))
+ inWriteLock(initializationLock) {
+ zNodeChangeHandlers.clear()
+ zNodeChildChangeHandlers.clear()
+ stateChangeHandlers.clear()
+ zooKeeper.close()
+ metricNames.foreach(removeMetric(_))
+ }
+ // Shutdown scheduler outside of lock to avoid deadlock if scheduler
+ // is waiting for lock to process session expiry
expiryScheduler.shutdown()
info("Closed.")
}
@@ -348,6 +352,18 @@ class ZooKeeperClient(connectString: String,
initialize()
}
+ // Visibility for testing
+ private[zookeeper] def scheduleSessionExpiryHandler(): Unit = {
+ expiryScheduler.schedule("zk-session-expired", () => {
+ inWriteLock(initializationLock) {
+ info("Session expired.")
+ stateChangeHandlers.values.foreach(_.beforeInitializingSession())
+ initialize()
+ stateChangeHandlers.values.foreach(_.afterInitializingSession())
+ }
+ }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+ }
+
// package level visibility for testing only
private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
@@ -363,14 +379,7 @@ class ZooKeeperClient(connectString: String,
error("Auth failed.")
stateChangeHandlers.values.foreach(_.onAuthFailure())
} else if (state == KeeperState.Expired) {
- expiryScheduler.schedule("zk-session-expired", () => {
- inWriteLock(initializationLock) {
- info("Session expired.")
- stateChangeHandlers.values.foreach(_.beforeInitializingSession())
- initialize()
- stateChangeHandlers.values.foreach(_.afterInitializingSession())
- }
- }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+ scheduleSessionExpiryHandler()
}
case Some(path) =>
(event.getType: @unchecked) match {
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 2e0651c..77e11ea 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -486,6 +486,28 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted)
}
+ @Test
+ def testSessionExpiryDuringClose(): Unit = {
+ val semaphore = new Semaphore(0)
+ val closeExecutor = Executors.newSingleThreadExecutor
+ try {
+ zooKeeperClient.expiryScheduler.schedule("test", () => semaphore.acquireUninterruptibly(),
+ delay = 0, period = -1, TimeUnit.SECONDS)
+ zooKeeperClient.scheduleSessionExpiryHandler()
+ val closeFuture = closeExecutor.submit(new Runnable {
+ override def run(): Unit = {
+ zooKeeperClient.close()
+ }
+ })
+ assertFalse("Close completed without shutting down expiry scheduler gracefully", closeFuture.isDone)
+ semaphore.release()
+ closeFuture.get(10, TimeUnit.SECONDS)
+ assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted)
+ } finally {
+ closeExecutor.shutdownNow()
+ }
+ }
+
def isExpectedMetricName(metricName: MetricName, name: String): Boolean =
metricName.getName == name && metricName.getGroup == "testMetricGroup" && metricName.getType == "testMetricType"
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.