You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/26 19:34:49 UTC
svn commit: r1175966 - in /incubator/kafka/trunk/core/src/main/scala/kafka:
consumer/ZookeeperConsumerConnector.scala log/LogManager.scala
producer/Producer.scala server/KafkaServer.scala utils/KafkaScheduler.scala
Author: junrao
Date: Mon Sep 26 17:34:48 2011
New Revision: 1175966
URL: http://svn.apache.org/viewvc?rev=1175966&view=rev
Log:
Log flush should complete upon broker shutdown; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-126
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Sep 26 17:34:48 2011
@@ -122,7 +122,7 @@ private[kafka] class ZookeeperConsumerCo
if (canShutdown) {
logger.info("ZKConsumerConnector shutting down")
try {
- scheduler.shutdown()
+ scheduler.shutdownNow()
fetcher match {
case Some(f) => f.shutdown()
case None =>
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Mon Sep 26 17:34:48 2011
@@ -260,7 +260,7 @@ private[kafka] class LogManager(val conf
* Close all the logs
*/
def close() {
- logFlusherScheduler.shutdown
+ logFlusherScheduler.shutdown()
val iter = getLogIterator
while(iter.hasNext)
iter.next.close()
@@ -317,7 +317,7 @@ private[kafka] class LogManager(val conf
case _ =>
}
}
- }
+ }
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Mon Sep 26 17:34:48 2011
@@ -37,6 +37,8 @@ class Producer[K,V](config: ProducerConf
private val hasShutdown = new AtomicBoolean(false)
if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo))
throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+ if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerPartitionInfo))
+ logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
private val random = new java.util.Random
// check if zookeeper based auto partition discovery is enabled
private val zkEnabled = Utils.propertyExists(config.zkConnect)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Mon Sep 26 17:34:48 2011
@@ -96,7 +96,7 @@ class KafkaServer(val config: KafkaConfi
if (canShutdown) {
logger.info("Shutting down...")
try {
- scheduler.shutdown
+ scheduler.shutdown()
if (socketServer != null)
socketServer.shutdown()
Utils.swallow(logger.warn, Utils.unregisterMBean(statsMBeanName))
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1175966&r1=1175965&r2=1175966&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala Mon Sep 26 17:34:48 2011
@@ -36,12 +36,19 @@ class KafkaScheduler(val numThreads: Int
t
}
})
-
+ executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+ executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+
def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) =
executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
- def shutdown() = {
- executor.shutdownNow
+ def shutdownNow() {
+ executor.shutdownNow()
+ logger.info("force shutdown scheduler " + baseThreadName)
+ }
+
+ def shutdown() {
+ executor.shutdown()
logger.info("shutdown scheduler " + baseThreadName)
}
}