You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/26 19:34:49 UTC

svn commit: r1175966 - in /incubator/kafka/trunk/core/src/main/scala/kafka: consumer/ZookeeperConsumerConnector.scala log/LogManager.scala producer/Producer.scala server/KafkaServer.scala utils/KafkaScheduler.scala

Author: junrao
Date: Mon Sep 26 17:34:48 2011
New Revision: 1175966

URL: http://svn.apache.org/viewvc?rev=1175966&view=rev
Log:
Log flush should complete upon broker shutdown; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-126

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Sep 26 17:34:48 2011
@@ -122,7 +122,7 @@ private[kafka] class ZookeeperConsumerCo
     if (canShutdown) {
       logger.info("ZKConsumerConnector shutting down")
       try {
-        scheduler.shutdown()
+        scheduler.shutdownNow()
         fetcher match {
           case Some(f) => f.shutdown()
           case None =>

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Mon Sep 26 17:34:48 2011
@@ -260,7 +260,7 @@ private[kafka] class LogManager(val conf
    * Close all the logs
    */
   def close() {
-    logFlusherScheduler.shutdown
+    logFlusherScheduler.shutdown()
     val iter = getLogIterator
     while(iter.hasNext)
       iter.next.close()
@@ -317,7 +317,7 @@ private[kafka] class LogManager(val conf
             case _ =>
           }
       }
-    }     
+    }
   }
 
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Mon Sep 26 17:34:48 2011
@@ -37,6 +37,8 @@ class Producer[K,V](config: ProducerConf
   private val hasShutdown = new AtomicBoolean(false)
   if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo))
     throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+  if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerPartitionInfo))
+    logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
   private val random = new java.util.Random
   // check if zookeeper based auto partition discovery is enabled
   private val zkEnabled = Utils.propertyExists(config.zkConnect)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Mon Sep 26 17:34:48 2011
@@ -96,7 +96,7 @@ class KafkaServer(val config: KafkaConfi
     if (canShutdown) {
       logger.info("Shutting down...")
       try {
-        scheduler.shutdown
+        scheduler.shutdown()
         if (socketServer != null)
           socketServer.shutdown()
         Utils.swallow(logger.warn, Utils.unregisterMBean(statsMBeanName))

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala Mon Sep 26 17:34:48 2011
@@ -36,12 +36,19 @@ class KafkaScheduler(val numThreads: Int
       t
     }
   })
-  
+  executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+  executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+
   def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) =
     executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
 
-  def shutdown() = {
-    executor.shutdownNow
+  def shutdownNow() {
+    executor.shutdownNow()
+    logger.info("force shutdown scheduler " + baseThreadName)
+  }
+
+  def shutdown() {
+    executor.shutdown()
     logger.info("shutdown scheduler " + baseThreadName)
   }
 }