You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/12/13 04:03:39 UTC
svn commit: r1213546 - in
/incubator/kafka/trunk/core/src/main/scala/kafka/server: KafkaServer.scala
KafkaServerStartable.scala
Author: junrao
Date: Tue Dec 13 03:03:38 2011
New Revision: 1213546
URL: http://svn.apache.org/viewvc?rev=1213546&view=rev
Log:
Embedded consumer doesn't shut down if the server can't start; patched by Jun Rao; reviewed by Neha Narkhede and Jay Kreps; KAFKA-197
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1213546&r1=1213545&r2=1213546&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Tue Dec 13 03:03:38 2011
@@ -47,42 +47,35 @@ class KafkaServer(val config: KafkaConfi
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
- try {
- info("Starting Kafka server...")
- var needRecovery = true
- val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
- if (cleanShutDownFile.exists) {
- needRecovery = false
- cleanShutDownFile.delete
- }
- logManager = new LogManager(config,
- scheduler,
- SystemTime,
- 1000L * 60 * config.logCleanupIntervalMinutes,
- 1000L * 60 * 60 * config.logRetentionHours,
- needRecovery)
-
- val handlers = new KafkaRequestHandlers(logManager)
- socketServer = new SocketServer(config.port,
- config.numThreads,
- config.monitoringPeriodSecs,
- handlers.handlerFor,
- config.maxSocketRequestSize)
- Utils.registerMBean(socketServer.stats, statsMBeanName)
- socketServer.startup
- Mx4jLoader.maybeLoad
- /**
- * Registers this broker in ZK. After this, consumers can connect to broker.
- * So this should happen after socket server start.
- */
- logManager.startup
- info("Server started.")
- }
- catch {
- case e =>
- fatal("Fatal error during startup.", e)
- shutdown
+ info("Starting Kafka server...")
+ var needRecovery = true
+ val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+ if (cleanShutDownFile.exists) {
+ needRecovery = false
+ cleanShutDownFile.delete
}
+ logManager = new LogManager(config,
+ scheduler,
+ SystemTime,
+ 1000L * 60 * config.logCleanupIntervalMinutes,
+ 1000L * 60 * 60 * config.logRetentionHours,
+ needRecovery)
+
+ val handlers = new KafkaRequestHandlers(logManager)
+ socketServer = new SocketServer(config.port,
+ config.numThreads,
+ config.monitoringPeriodSecs,
+ handlers.handlerFor,
+ config.maxSocketRequestSize)
+ Utils.registerMBean(socketServer.stats, statsMBeanName)
+ socketServer.startup()
+ Mx4jLoader.maybeLoad
+ /**
+ * Registers this broker in ZK. After this, consumers can connect to broker.
+ * So this should happen after socket server start.
+ */
+ logManager.startup()
+ info("Kafka server started.")
}
/**
@@ -92,25 +85,19 @@ class KafkaServer(val config: KafkaConfi
def shutdown() {
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
- info("Shutting down...")
- try {
- scheduler.shutdown()
- if (socketServer != null)
- socketServer.shutdown()
- Utils.unregisterMBean(statsMBeanName)
- if (logManager != null)
- logManager.close()
+ info("Shutting down Kafka server")
+ scheduler.shutdown()
+ if (socketServer != null)
+ socketServer.shutdown()
+ Utils.unregisterMBean(statsMBeanName)
+ if (logManager != null)
+ logManager.close()
+
+ val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+ cleanShutDownFile.createNewFile
- val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
- cleanShutDownFile.createNewFile
- }
- catch {
- case e =>
- fatal(e)
- fatal(Utils.stackTrace(e))
- }
shutdownLatch.countDown()
- info("shut down completed")
+ info("Kafka server shut down completed")
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1213546&r1=1213545&r2=1213546&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Tue Dec 13 03:03:38 2011
@@ -27,7 +27,7 @@ import scala.collection.Map
class KafkaServerStartable(val serverConfig: KafkaConfig,
val consumerConfig: ConsumerConfig,
- val producerConfig: ProducerConfig) {
+ val producerConfig: ProducerConfig) extends Logging {
private var server : KafkaServer = null
private var embeddedConsumer : EmbeddedConsumer = null
@@ -43,15 +43,29 @@ class KafkaServerStartable(val serverCon
}
def startup() {
- server.startup
- if (embeddedConsumer != null)
- embeddedConsumer.startup
+ try {
+ server.startup()
+ if (embeddedConsumer != null)
+ embeddedConsumer.startup()
+ }
+ catch {
+ case e =>
+ fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
+ shutdown()
+ }
}
def shutdown() {
- if (embeddedConsumer != null)
- embeddedConsumer.shutdown
- server.shutdown
+ try {
+ if (embeddedConsumer != null)
+ embeddedConsumer.shutdown()
+ server.shutdown()
+ }
+ catch {
+ case e =>
+ fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
+ Runtime.getRuntime.halt(1)
+ }
}
def awaitShutdown() {