You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/12/13 04:03:39 UTC

svn commit: r1213546 - in /incubator/kafka/trunk/core/src/main/scala/kafka/server: KafkaServer.scala KafkaServerStartable.scala

Author: junrao
Date: Tue Dec 13 03:03:38 2011
New Revision: 1213546

URL: http://svn.apache.org/viewvc?rev=1213546&view=rev
Log:
Embedded consumer doesn't shut down if the server can't start; patched by Jun Rao; reviewed by Neha Narkhede and Jay Kreps; KAFKA-197

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1213546&r1=1213545&r2=1213546&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Tue Dec 13 03:03:38 2011
@@ -47,42 +47,35 @@ class KafkaServer(val config: KafkaConfi
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
-    try {
-      info("Starting Kafka server...")
-      var needRecovery = true
-      val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
-      if (cleanShutDownFile.exists) {
-        needRecovery = false
-        cleanShutDownFile.delete
-      }
-      logManager = new LogManager(config,
-                                  scheduler,
-                                  SystemTime,
-                                  1000L * 60 * config.logCleanupIntervalMinutes,
-                                  1000L * 60 * 60 * config.logRetentionHours,
-                                  needRecovery)
-                                                    
-      val handlers = new KafkaRequestHandlers(logManager)
-      socketServer = new SocketServer(config.port,
-                                      config.numThreads,
-                                      config.monitoringPeriodSecs,
-                                      handlers.handlerFor,
-                                      config.maxSocketRequestSize)
-      Utils.registerMBean(socketServer.stats, statsMBeanName)
-      socketServer.startup
-      Mx4jLoader.maybeLoad
-      /**
-       *  Registers this broker in ZK. After this, consumers can connect to broker.
-       *  So this should happen after socket server start.
-       */
-      logManager.startup
-      info("Server started.")
-    }
-    catch {
-      case e =>
-        fatal("Fatal error during startup.", e)
-        shutdown
+    info("Starting Kafka server...")
+    var needRecovery = true
+    val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+    if (cleanShutDownFile.exists) {
+      needRecovery = false
+      cleanShutDownFile.delete
     }
+    logManager = new LogManager(config,
+                                scheduler,
+                                SystemTime,
+                                1000L * 60 * config.logCleanupIntervalMinutes,
+                                1000L * 60 * 60 * config.logRetentionHours,
+                                needRecovery)
+                                                    
+    val handlers = new KafkaRequestHandlers(logManager)
+    socketServer = new SocketServer(config.port,
+                                    config.numThreads,
+                                    config.monitoringPeriodSecs,
+                                    handlers.handlerFor,
+                                    config.maxSocketRequestSize)
+    Utils.registerMBean(socketServer.stats, statsMBeanName)
+    socketServer.startup()
+    Mx4jLoader.maybeLoad
+    /**
+     *  Registers this broker in ZK. After this, consumers can connect to broker.
+     *  So this should happen after socket server start.
+     */
+    logManager.startup()
+    info("Kafka server started.")
   }
   
   /**
@@ -92,25 +85,19 @@ class KafkaServer(val config: KafkaConfi
   def shutdown() {
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
-      info("Shutting down...")
-      try {
-        scheduler.shutdown()
-        if (socketServer != null)
-          socketServer.shutdown()
-        Utils.unregisterMBean(statsMBeanName)
-        if (logManager != null)
-          logManager.close()
+      info("Shutting down Kafka server")
+      scheduler.shutdown()
+      if (socketServer != null)
+        socketServer.shutdown()
+      Utils.unregisterMBean(statsMBeanName)
+      if (logManager != null)
+        logManager.close()
+
+      val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+      cleanShutDownFile.createNewFile
 
-        val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
-        cleanShutDownFile.createNewFile
-      }
-      catch {
-        case e =>
-          fatal(e)
-          fatal(Utils.stackTrace(e))
-      }
       shutdownLatch.countDown()
-      info("shut down completed")
+      info("Kafka server shut down completed")
     }
   }
   

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1213546&r1=1213545&r2=1213546&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Tue Dec 13 03:03:38 2011
@@ -27,7 +27,7 @@ import scala.collection.Map
 
 class KafkaServerStartable(val serverConfig: KafkaConfig,
                            val consumerConfig: ConsumerConfig,
-                           val producerConfig: ProducerConfig) {
+                           val producerConfig: ProducerConfig) extends Logging {
   private var server : KafkaServer = null
   private var embeddedConsumer : EmbeddedConsumer = null
 
@@ -43,15 +43,29 @@ class KafkaServerStartable(val serverCon
   }
 
   def startup() {
-    server.startup
-    if (embeddedConsumer != null)
-      embeddedConsumer.startup
+    try {
+      server.startup()
+      if (embeddedConsumer != null)
+        embeddedConsumer.startup()
+    }
+    catch {
+      case e =>
+        fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
+        shutdown()
+    }
   }
 
   def shutdown() {
-    if (embeddedConsumer != null)
-      embeddedConsumer.shutdown
-    server.shutdown
+    try {
+      if (embeddedConsumer != null)
+        embeddedConsumer.shutdown()
+      server.shutdown()
+    }
+    catch {
+      case e =>
+        fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
+        Runtime.getRuntime.halt(1)
+    }
   }
 
   def awaitShutdown() {