You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/05/08 22:59:36 UTC

[kafka] branch 1.1 updated: KAFKA-6879; Invoke session init callbacks outside lock to avoid Controller deadlock (#4977)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 80396da  KAFKA-6879; Invoke session init callbacks outside lock to avoid Controller deadlock (#4977)
80396da is described below

commit 80396dab05754be30cb9681671e65c479a4005af
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 8 15:52:43 2018 -0700

    KAFKA-6879; Invoke session init callbacks outside lock to avoid Controller deadlock (#4977)
    
    Fixes a deadlock between the controller's beforeInitializingSession callback which holds the zookeeper client initialization lock while awaiting completion of an asynchronous event which itself depends on the same lock.
    
    Also catch and log callback exceptions to ensure the ZooKeeper reconnection takes place.
    Finally, configure KafkaScheduler in ZooKeeperClient to have at least 1 thread.
    
    Added tests that fail or hang without the changes in this PR.
    
    Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
 .../scala/kafka/controller/KafkaController.scala   | 13 ++--
 .../main/scala/kafka/utils/KafkaScheduler.scala    |  8 ++-
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    | 74 +++++++++++++-------
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 80 +++++++++++++++++++++-
 4 files changed, 140 insertions(+), 35 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ed2fb90..97b6406 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -160,7 +160,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       override def beforeInitializingSession(): Unit = {
         val expireEvent = new Expire
         eventManager.clearAndPut(expireEvent)
-        expireEvent.waitUntilProcessed()
+
+        // Block initialization of the new session until the expiration event is being handled,
+        // which ensures that all pending events have been processed before creating the new session
+        expireEvent.waitUntilProcessingStarted()
       }
     })
     eventManager.put(Startup)
@@ -1491,17 +1494,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   // We can't make this a case object due to the countDownLatch field
   class Expire extends ControllerEvent {
-    private val countDownLatch = new CountDownLatch(1)
+    private val processingStarted = new CountDownLatch(1)
     override def state = ControllerState.ControllerChange
 
     override def process(): Unit = {
-      countDownLatch.countDown()
+      processingStarted.countDown()
       activeControllerId = -1
       onControllerResignation()
     }
 
-    def waitUntilProcessed(): Unit = {
-      countDownLatch.await()
+    def waitUntilProcessingStarted(): Unit = {
+      processingStarted.await()
     }
   }
 
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 5407934..24eb177 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -99,11 +99,15 @@ class KafkaScheduler(val threads: Int,
     }
   }
 
-  def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) {
+  def scheduleOnce(name: String, fun: () => Unit): Unit = {
+    schedule(name, fun, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+  }
+
+  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit) {
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning
+      ensureRunning()
       val runnable = CoreUtils.runnable {
         try {
           trace("Beginning execution of scheduled task '%s'.".format(name))
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 5c4cd68..5cb127c 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -59,7 +59,7 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala
-  private[zookeeper] val expiryScheduler = new KafkaScheduler(0, "zk-session-expiry-handler")
+  private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, "zk-session-expiry-handler")
 
   private val metricNames = Set[String]()
 
@@ -325,43 +325,65 @@ class ZooKeeperClient(connectString: String,
     zooKeeper
   }
   
-  private def initialize(): Unit = {
-    if (!connectionState.isAlive) {
-      zooKeeper.close()
-      info(s"Initializing a new session to $connectString.")
-      // retry forever until ZooKeeper can be instantiated
-      var connected = false
-      while (!connected) {
-        try {
-          zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
-          connected = true
-        } catch {
-          case e: Exception =>
-            info("Error when recreating ZooKeeper, retrying after a short sleep", e)
-            Thread.sleep(1000)
+  private def reinitialize(): Unit = {
+    // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion
+    // may require additional Zookeeper requests, which will block to acquire the initialization lock
+    stateChangeHandlers.values.foreach(callBeforeInitializingSession _)
+
+    inWriteLock(initializationLock) {
+      if (!connectionState.isAlive) {
+        zooKeeper.close()
+        info(s"Initializing a new session to $connectString.")
+        // retry forever until ZooKeeper can be instantiated
+        var connected = false
+        while (!connected) {
+          try {
+            zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
+            connected = true
+          } catch {
+            case e: Exception =>
+              info("Error when recreating ZooKeeper, retrying after a short sleep", e)
+              Thread.sleep(1000)
+          }
         }
       }
     }
+
+    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
   }
 
   /**
-   * reinitialize method to use in unit tests
+   * Close the zookeeper client to force session reinitialization. This is visible for testing only.
    */
-  private[zookeeper] def reinitialize(): Unit = {
+  private[zookeeper] def forceReinitialize(): Unit = {
     zooKeeper.close()
-    initialize()
+    reinitialize()
+  }
+
+  private def callBeforeInitializingSession(handler: StateChangeHandler): Unit = {
+    try {
+      handler.beforeInitializingSession()
+    } catch {
+      case t: Throwable =>
+        error(s"Uncaught error in handler ${handler.name}", t)
+    }
+  }
+
+  private def callAfterInitializingSession(handler: StateChangeHandler): Unit = {
+    try {
+      handler.afterInitializingSession()
+    } catch {
+      case t: Throwable =>
+        error(s"Uncaught error in handler ${handler.name}", t)
+    }
   }
 
   // Visibility for testing
   private[zookeeper] def scheduleSessionExpiryHandler(): Unit = {
-    expiryScheduler.schedule("zk-session-expired", () => {
-      inWriteLock(initializationLock) {
-        info("Session expired.")
-        stateChangeHandlers.values.foreach(_.beforeInitializingSession())
-        initialize()
-        stateChangeHandlers.values.foreach(_.afterInitializingSession())
-      }
-    }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+    expiryScheduler.scheduleOnce("zk-session-expired", () => {
+      info("Session expired.")
+      reinitialize()
+    })
   }
 
   // package level visibility for testing only
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 4eb9e67..25a3278 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -19,7 +19,7 @@ package kafka.zookeeper
 import java.net.UnknownHostException
 import java.nio.charset.StandardCharsets
 import java.util.UUID
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, Executors, Semaphore, TimeUnit}
 
 import com.yammer.metrics.Metrics
@@ -306,6 +306,82 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testBlockOnRequestCompletionFromStateChangeHandler(): Unit = {
+    // This tests the scenario exposed by KAFKA-6879 in which the expiration callback awaits
+    // completion of a request which is handled by another thread
+
+    val latch = new CountDownLatch(1)
+    val stateChangeHandler = new StateChangeHandler {
+      override val name = this.getClass.getName
+      override def beforeInitializingSession(): Unit = {
+        latch.await()
+      }
+    }
+
+    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
+      "testMetricGroup", "testMetricType")
+    client.registerStateChangeHandler(stateChangeHandler)
+
+    val requestThread = new Thread() {
+      override def run(): Unit = {
+        try
+          client.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+            ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+        finally
+          latch.countDown()
+      }
+    }
+
+    val reinitializeThread = new Thread() {
+      override def run(): Unit = {
+        client.forceReinitialize()
+      }
+    }
+
+    reinitializeThread.start()
+
+    // sleep briefly before starting the request thread so that the initialization
+    // thread is blocking on the latch
+    Thread.sleep(100)
+    requestThread.start()
+
+    reinitializeThread.join()
+    requestThread.join()
+  }
+
+  @Test
+  def testExceptionInBeforeInitializingSession(): Unit = {
+    val faultyHandler = new StateChangeHandler {
+      override val name = this.getClass.getName
+      override def beforeInitializingSession(): Unit = {
+        throw new RuntimeException()
+      }
+    }
+
+    val goodHandler = new StateChangeHandler {
+      val calls = new AtomicInteger(0)
+      override val name = this.getClass.getName
+      override def beforeInitializingSession(): Unit = {
+        calls.incrementAndGet()
+      }
+    }
+
+    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
+      "testMetricGroup", "testMetricType")
+    client.registerStateChangeHandler(faultyHandler)
+    client.registerStateChangeHandler(goodHandler)
+
+    client.forceReinitialize()
+
+    assertEquals(1, goodHandler.calls.get)
+
+    // Client should be usable even if the callback throws an error
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+  }
+
+  @Test
   def testZNodeChildChangeHandlerForChildChange(): Unit = {
     import scala.collection.JavaConverters._
     val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
@@ -344,7 +420,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       "testMetricGroup", "testMetricType")
     try {
       zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
-      zooKeeperClient.reinitialize()
+      zooKeeperClient.forceReinitialize()
 
       assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
     } finally zooKeeperClient.close()

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.