You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/26 06:24:33 UTC
git commit: KAFKA-589 Clean shutdown after startup connection failure;
reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk f750dba65 -> 9c17747ba
KAFKA-589 Clean shutdown after startup connection failure; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c17747b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c17747b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c17747b
Branch: refs/heads/trunk
Commit: 9c17747baab829adb268da28b4d943bbd6ef4e9f
Parents: f750dba
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Sep 25 21:23:08 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Sep 25 21:23:10 2014 -0700
----------------------------------------------------------------------
config/server.properties | 2 +-
.../main/scala/kafka/server/KafkaServer.scala | 167 ++++++++++---------
.../kafka/server/KafkaServerStartable.scala | 4 +-
.../unit/kafka/server/ServerShutdownTest.scala | 27 +++
4 files changed, 121 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c17747b/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 5c0905a..b0e4496 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -118,4 +118,4 @@ log.cleaner.enable=false
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
-zookeeper.connection.timeout.ms=6000
+zookeeper.connection.timeout.ms=2000
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c17747b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 14a7ba5..3e9e91f 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -70,62 +70,70 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
- info("starting")
- brokerState.newState(Starting)
- isShuttingDown = new AtomicBoolean(false)
- shutdownLatch = new CountDownLatch(1)
-
- /* start scheduler */
- kafkaScheduler.startup()
+ try {
+ info("starting")
+ brokerState.newState(Starting)
+ isShuttingDown = new AtomicBoolean(false)
+ shutdownLatch = new CountDownLatch(1)
+
+ /* start scheduler */
+ kafkaScheduler.startup()
- /* setup zookeeper */
- zkClient = initZk()
-
- /* start log manager */
- logManager = createLogManager(zkClient, brokerState)
- logManager.startup()
-
- socketServer = new SocketServer(config.brokerId,
- config.hostName,
- config.port,
- config.numNetworkThreads,
- config.queuedMaxRequests,
- config.socketSendBufferBytes,
- config.socketReceiveBufferBytes,
- config.socketRequestMaxBytes,
- config.maxConnectionsPerIp,
- config.connectionsMaxIdleMs)
- socketServer.startup()
-
- replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
-
- /* start offset manager */
- offsetManager = createOffsetManager()
-
- kafkaController = new KafkaController(config, zkClient, brokerState)
+ /* setup zookeeper */
+ zkClient = initZk()
+
+ /* start log manager */
+ logManager = createLogManager(zkClient, brokerState)
+ logManager.startup()
+
+ socketServer = new SocketServer(config.brokerId,
+ config.hostName,
+ config.port,
+ config.numNetworkThreads,
+ config.queuedMaxRequests,
+ config.socketSendBufferBytes,
+ config.socketReceiveBufferBytes,
+ config.socketRequestMaxBytes,
+ config.maxConnectionsPerIp,
+ config.connectionsMaxIdleMs)
+ socketServer.startup()
+
+ replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+
+ /* start offset manager */
+ offsetManager = createOffsetManager()
+
+ kafkaController = new KafkaController(config, zkClient, brokerState)
- /* start processing requests */
- apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
- requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
- brokerState.newState(RunningAsBroker)
+ /* start processing requests */
+ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
+ requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+ brokerState.newState(RunningAsBroker)
- Mx4jLoader.maybeLoad()
+ Mx4jLoader.maybeLoad()
- replicaManager.startup()
+ replicaManager.startup()
- kafkaController.startup()
+ kafkaController.startup()
- topicConfigManager = new TopicConfigManager(zkClient, logManager)
- topicConfigManager.startup()
+ topicConfigManager = new TopicConfigManager(zkClient, logManager)
+ topicConfigManager.startup()
- /* tell everyone we are alive */
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
- kafkaHealthcheck.startup()
+ /* tell everyone we are alive */
+ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
+ kafkaHealthcheck.startup()
- registerStats()
- startupComplete.set(true)
- info("started")
+ registerStats()
+ startupComplete.set(true)
+ info("started")
+ }
+ catch {
+ case e: Throwable =>
+ fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
+ shutdown()
+ throw e
+ }
}
private def initZk(): ZkClient = {
@@ -254,35 +262,42 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
* Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
*/
def shutdown() {
- info("shutting down")
- val canShutdown = isShuttingDown.compareAndSet(false, true)
- if (canShutdown) {
- Utils.swallow(controlledShutdown())
- brokerState.newState(BrokerShuttingDown)
- if(kafkaHealthcheck != null)
- Utils.swallow(kafkaHealthcheck.shutdown())
- if(socketServer != null)
- Utils.swallow(socketServer.shutdown())
- if(requestHandlerPool != null)
- Utils.swallow(requestHandlerPool.shutdown())
- if(offsetManager != null)
- offsetManager.shutdown()
- Utils.swallow(kafkaScheduler.shutdown())
- if(apis != null)
- Utils.swallow(apis.close())
- if(replicaManager != null)
- Utils.swallow(replicaManager.shutdown())
- if(logManager != null)
- Utils.swallow(logManager.shutdown())
- if(kafkaController != null)
- Utils.swallow(kafkaController.shutdown())
- if(zkClient != null)
- Utils.swallow(zkClient.close())
-
- brokerState.newState(NotRunning)
- shutdownLatch.countDown()
- startupComplete.set(false)
- info("shut down completed")
+ try {
+ info("shutting down")
+ val canShutdown = isShuttingDown.compareAndSet(false, true)
+ if (canShutdown) {
+ Utils.swallow(controlledShutdown())
+ brokerState.newState(BrokerShuttingDown)
+ if(kafkaHealthcheck != null)
+ Utils.swallow(kafkaHealthcheck.shutdown())
+ if(socketServer != null)
+ Utils.swallow(socketServer.shutdown())
+ if(requestHandlerPool != null)
+ Utils.swallow(requestHandlerPool.shutdown())
+ if(offsetManager != null)
+ offsetManager.shutdown()
+ Utils.swallow(kafkaScheduler.shutdown())
+ if(apis != null)
+ Utils.swallow(apis.close())
+ if(replicaManager != null)
+ Utils.swallow(replicaManager.shutdown())
+ if(logManager != null)
+ Utils.swallow(logManager.shutdown())
+ if(kafkaController != null)
+ Utils.swallow(kafkaController.shutdown())
+ if(zkClient != null)
+ Utils.swallow(zkClient.close())
+
+ brokerState.newState(NotRunning)
+ shutdownLatch.countDown()
+ startupComplete.set(false)
+ info("shut down completed")
+ }
+ }
+ catch {
+ case e: Throwable =>
+ fatal("Fatal error during KafkaServer shutdown.", e)
+ throw e
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c17747b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 28658bb..cd64bbe 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -29,8 +29,8 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
}
catch {
case e: Throwable =>
- fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
- shutdown()
+ fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
+ // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
System.exit(1)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c17747b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index ab60e9b..3804a11 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -114,6 +114,33 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
verifyNonDaemonThreadsStatus
}
+ @Test
+ def testCleanShutdownAfterFailedStartup() {
+ val newProps = TestUtils.createBrokerConfig(0, port)
+ newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
+ val newConfig = new KafkaConfig(newProps)
+ var server = new KafkaServer(newConfig)
+ try {
+ server.startup()
+ fail("Expected KafkaServer setup to fail, throw exception")
+ }
+ catch {
+ // Try to clean up carefully without hanging even if the test fails. This means trying to accurately
+ // identify the correct exception, making sure the server was shutdown, and cleaning up if anything
+ // goes wrong so that awaitShutdown doesn't hang
+ case e: org.I0Itec.zkclient.exception.ZkException =>
+ assertEquals(server.brokerState.currentState, NotRunning.state)
+ if (server.brokerState.currentState != NotRunning.state)
+ server.shutdown()
+ case e: Throwable =>
+ fail("Expected KafkaServer setup to fail with connection exception but caught a different exception.")
+ server.shutdown()
+ }
+ server.awaitShutdown()
+ Utils.rm(server.config.logDirs)
+ verifyNonDaemonThreadsStatus
+ }
+
def verifyNonDaemonThreadsStatus() {
assertEquals(0, Thread.getAllStackTraces.keySet().toArray
.map(_.asInstanceOf[Thread])