You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/26 06:24:33 UTC

git commit: KAFKA-589 Clean shutdown after startup connection failure; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk f750dba65 -> 9c17747ba


KAFKA-589 Clean shutdown after startup connection failure; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c17747b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c17747b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c17747b

Branch: refs/heads/trunk
Commit: 9c17747baab829adb268da28b4d943bbd6ef4e9f
Parents: f750dba
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Sep 25 21:23:08 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Sep 25 21:23:10 2014 -0700

----------------------------------------------------------------------
 config/server.properties                        |   2 +-
 .../main/scala/kafka/server/KafkaServer.scala   | 167 ++++++++++---------
 .../kafka/server/KafkaServerStartable.scala     |   4 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |  27 +++
 4 files changed, 121 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9c17747b/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 5c0905a..b0e4496 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -118,4 +118,4 @@ log.cleaner.enable=false
 zookeeper.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
-zookeeper.connection.timeout.ms=6000
+zookeeper.connection.timeout.ms=2000

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c17747b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 14a7ba5..3e9e91f 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -70,62 +70,70 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
-    info("starting")
-    brokerState.newState(Starting)
-    isShuttingDown = new AtomicBoolean(false)
-    shutdownLatch = new CountDownLatch(1)
-
-    /* start scheduler */
-    kafkaScheduler.startup()
+    try {
+      info("starting")
+      brokerState.newState(Starting)
+      isShuttingDown = new AtomicBoolean(false)
+      shutdownLatch = new CountDownLatch(1)
+
+      /* start scheduler */
+      kafkaScheduler.startup()
     
-    /* setup zookeeper */
-    zkClient = initZk()
-
-    /* start log manager */
-    logManager = createLogManager(zkClient, brokerState)
-    logManager.startup()
-
-    socketServer = new SocketServer(config.brokerId,
-                                    config.hostName,
-                                    config.port,
-                                    config.numNetworkThreads,
-                                    config.queuedMaxRequests,
-                                    config.socketSendBufferBytes,
-                                    config.socketReceiveBufferBytes,
-                                    config.socketRequestMaxBytes,
-                                    config.maxConnectionsPerIp,
-                                    config.connectionsMaxIdleMs)
-    socketServer.startup()
-
-    replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
-
-    /* start offset manager */
-    offsetManager = createOffsetManager()
-
-    kafkaController = new KafkaController(config, zkClient, brokerState)
+      /* setup zookeeper */
+      zkClient = initZk()
+
+      /* start log manager */
+      logManager = createLogManager(zkClient, brokerState)
+      logManager.startup()
+
+      socketServer = new SocketServer(config.brokerId,
+                                      config.hostName,
+                                      config.port,
+                                      config.numNetworkThreads,
+                                      config.queuedMaxRequests,
+                                      config.socketSendBufferBytes,
+                                      config.socketReceiveBufferBytes,
+                                      config.socketRequestMaxBytes,
+                                      config.maxConnectionsPerIp,
+                                      config.connectionsMaxIdleMs)
+      socketServer.startup()
+
+      replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+
+      /* start offset manager */
+      offsetManager = createOffsetManager()
+
+      kafkaController = new KafkaController(config, zkClient, brokerState)
     
-    /* start processing requests */
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
-    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
-    brokerState.newState(RunningAsBroker)
+      /* start processing requests */
+      apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
+      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+      brokerState.newState(RunningAsBroker)
    
-    Mx4jLoader.maybeLoad()
+      Mx4jLoader.maybeLoad()
 
-    replicaManager.startup()
+      replicaManager.startup()
 
-    kafkaController.startup()
+      kafkaController.startup()
     
-    topicConfigManager = new TopicConfigManager(zkClient, logManager)
-    topicConfigManager.startup()
+      topicConfigManager = new TopicConfigManager(zkClient, logManager)
+      topicConfigManager.startup()
     
-    /* tell everyone we are alive */
-    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
-    kafkaHealthcheck.startup()
+      /* tell everyone we are alive */
+      kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
+      kafkaHealthcheck.startup()
 
     
-    registerStats()
-    startupComplete.set(true)
-    info("started")
+      registerStats()
+      startupComplete.set(true)
+      info("started")
+    }
+    catch {
+      case e: Throwable =>
+        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
+        shutdown()
+        throw e
+    }
   }
 
   private def initZk(): ZkClient = {
@@ -254,35 +262,42 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
    * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
    */
   def shutdown() {
-    info("shutting down")
-    val canShutdown = isShuttingDown.compareAndSet(false, true)
-    if (canShutdown) {
-      Utils.swallow(controlledShutdown())
-      brokerState.newState(BrokerShuttingDown)
-      if(kafkaHealthcheck != null)
-        Utils.swallow(kafkaHealthcheck.shutdown())
-      if(socketServer != null)
-        Utils.swallow(socketServer.shutdown())
-      if(requestHandlerPool != null)
-        Utils.swallow(requestHandlerPool.shutdown())
-      if(offsetManager != null)
-        offsetManager.shutdown()
-      Utils.swallow(kafkaScheduler.shutdown())
-      if(apis != null)
-        Utils.swallow(apis.close())
-      if(replicaManager != null)
-        Utils.swallow(replicaManager.shutdown())
-      if(logManager != null)
-        Utils.swallow(logManager.shutdown())
-      if(kafkaController != null)
-        Utils.swallow(kafkaController.shutdown())
-      if(zkClient != null)
-        Utils.swallow(zkClient.close())
-
-      brokerState.newState(NotRunning)
-      shutdownLatch.countDown()
-      startupComplete.set(false)
-      info("shut down completed")
+    try {
+      info("shutting down")
+      val canShutdown = isShuttingDown.compareAndSet(false, true)
+      if (canShutdown) {
+        Utils.swallow(controlledShutdown())
+        brokerState.newState(BrokerShuttingDown)
+        if(kafkaHealthcheck != null)
+          Utils.swallow(kafkaHealthcheck.shutdown())
+        if(socketServer != null)
+          Utils.swallow(socketServer.shutdown())
+        if(requestHandlerPool != null)
+          Utils.swallow(requestHandlerPool.shutdown())
+        if(offsetManager != null)
+          offsetManager.shutdown()
+        Utils.swallow(kafkaScheduler.shutdown())
+        if(apis != null)
+          Utils.swallow(apis.close())
+        if(replicaManager != null)
+          Utils.swallow(replicaManager.shutdown())
+        if(logManager != null)
+          Utils.swallow(logManager.shutdown())
+        if(kafkaController != null)
+          Utils.swallow(kafkaController.shutdown())
+        if(zkClient != null)
+          Utils.swallow(zkClient.close())
+
+        brokerState.newState(NotRunning)
+        shutdownLatch.countDown()
+        startupComplete.set(false)
+        info("shut down completed")
+      }
+    }
+    catch {
+      case e: Throwable =>
+        fatal("Fatal error during KafkaServer shutdown.", e)
+        throw e
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c17747b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 28658bb..cd64bbe 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -29,8 +29,8 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
     }
     catch {
       case e: Throwable =>
-        fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
-        shutdown()
+        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
+        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
         System.exit(1)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c17747b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index ab60e9b..3804a11 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -114,6 +114,33 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     verifyNonDaemonThreadsStatus
   }
 
+  @Test
+  def testCleanShutdownAfterFailedStartup() {
+    val newProps = TestUtils.createBrokerConfig(0, port)
+    newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
+    val newConfig = new KafkaConfig(newProps)
+    var server = new KafkaServer(newConfig)
+    try {
+      server.startup()
+      fail("Expected KafkaServer setup to fail, throw exception")
+    }
+    catch {
+      // Try to clean up carefully without hanging even if the test fails. This means trying to accurately
+      // identify the correct exception, making sure the server was shutdown, and cleaning up if anything
+      // goes wrong so that awaitShutdown doesn't hang
+      case e: org.I0Itec.zkclient.exception.ZkException =>
+        assertEquals(server.brokerState.currentState, NotRunning.state)
+        if (server.brokerState.currentState != NotRunning.state)
+          server.shutdown()
+      case e: Throwable =>
+        fail("Expected KafkaServer setup to fail with connection exception but caught a different exception.")
+        server.shutdown()
+    }
+    server.awaitShutdown()
+    Utils.rm(server.config.logDirs)
+    verifyNonDaemonThreadsStatus
+  }
+
   def verifyNonDaemonThreadsStatus() {
     assertEquals(0, Thread.getAllStackTraces.keySet().toArray
       .map(_.asInstanceOf[Thread])