You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/25 08:23:19 UTC

svn commit: r1377220 - in /incubator/kafka/branches/0.8: config/ core/src/main/scala/kafka/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/metrics/ core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/producer/async/ core/src/ma...

Author: junrao
Date: Sat Aug 25 06:23:18 2012
New Revision: 1377220

URL: http://svn.apache.org/viewvc?rev=1377220&view=rev
Log:
Log errors for unrecognized config options; patched by Jun Rao; reviewed by Jay Kreps; KAFKA-181

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
Modified:
    incubator/kafka/branches/0.8/config/server.properties
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala

Modified: incubator/kafka/branches/0.8/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/server.properties?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/server.properties (original)
+++ incubator/kafka/branches/0.8/config/server.properties Sat Aug 25 06:23:18 2012
@@ -104,9 +104,6 @@ log.cleanup.interval.mins=1
 
 ############################# Zookeeper #############################
 
-# Enable connecting to zookeeper
-enable.zookeeper=true
-
 # Zk connection string (see zk docs for details).
 # This is a comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Sat Aug 25 06:23:18 2012
@@ -33,10 +33,11 @@ object Kafka extends Logging {
     try {
       val props = Utils.loadProps(args(0))
       val serverConfig = new KafkaConfig(props)
-      val metricsConfig = new KafkaMetricsConfig(props)
+      val verifiableProps = serverConfig.props
+      val metricsConfig = new KafkaMetricsConfig(verifiableProps)
       metricsConfig.reporters.foreach(reporterType => {
         val reporter = Utils.getObject[KafkaMetricsReporter](reporterType)
-        reporter.init(props)
+        reporter.init(verifiableProps)
         if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
           Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
       })

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Sat Aug 25 06:23:18 2012
@@ -129,7 +129,7 @@ object ConsoleConsumer extends Logging {
     props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
     props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString)
     props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString)
-    props.put("auto.commit", "true")
+    props.put("autocommit.enable", "true")
     props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
     props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
     props.put("zk.connect", options.valueOf(zkConnectOpt))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Sat Aug 25 06:23:18 2012
@@ -18,8 +18,9 @@
 package kafka.consumer
 
 import java.util.Properties
-import kafka.utils.{ZKConfig, Utils}
 import kafka.api.OffsetRequest
+import kafka.utils.{VerifiableProperties, ZKConfig}
+
 object ConsumerConfig {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
@@ -43,62 +44,67 @@ object ConsumerConfig {
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
 }
 
-class ConsumerConfig(props: Properties) extends ZKConfig(props) {
+class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) {
   import ConsumerConfig._
 
+  def this(originalProps: Properties) {
+    this(new VerifiableProperties(originalProps))
+    props.verify()
+  }
+
   /** a string that uniquely identifies a set of consumers within the same consumer group */
-  val groupId = Utils.getString(props, "groupid")
+  val groupId = props.getString("groupid")
 
   /** consumer id: generated automatically if not set.
    *  Set this explicitly for only testing purpose. */
-  val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null))
+  val consumerId: Option[String] = Option(props.getString("consumerid", null))
 
   /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
-  val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
+  val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
   
   /** the socket receive buffer for network requests */
-  val socketBufferSize = Utils.getInt(props, "socket.buffersize", SocketBufferSize)
+  val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize)
   
   /** the number of byes of messages to attempt to fetch */
-  val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
+  val fetchSize = props.getInt("fetch.size", FetchSize)
   
   /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
-  val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
+  val autoCommit = props.getBoolean("autocommit.enable", AutoCommit)
   
   /** the frequency in ms that the consumer offsets are committed to zookeeper */
-  val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AutoCommitInterval)
+  val autoCommitIntervalMs = props.getInt("autocommit.interval.ms", AutoCommitInterval)
 
   /** max number of messages buffered for consumption */
-  val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
+  val maxQueuedChunks = props.getInt("queuedchunks.max", MaxQueuedChunks)
 
   /** max number of retries during rebalance */
-  val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
+  val maxRebalanceRetries = props.getInt("rebalance.retries.max", MaxRebalanceRetries)
   
   /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
-  val minFetchBytes = Utils.getInt(props, "min.fetch.bytes", MinFetchBytes)
+  val minFetchBytes = props.getInt("min.fetch.bytes", MinFetchBytes)
   
   /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */
-  val maxFetchWaitMs = Utils.getInt(props, "max.fetch.wait.ms", MaxFetchWaitMs)
+  val maxFetchWaitMs = props.getInt("max.fetch.wait.ms", MaxFetchWaitMs)
   
   /** backoff time between retries during rebalance */
-  val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)
+  val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
 
   /** backoff time to refresh the leader of a partition after it loses the current leader */
-  val refreshLeaderBackoffMs = Utils.getInt(props, "refresh.leader.backoff.ms", 200)
+  val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", 200)
 
   /* what to do if an offset is out of range.
      smallest : automatically reset the offset to the smallest offset
      largest : automatically reset the offset to the largest offset
      anything else: throw exception to the consumer */
-  val autoOffsetReset = Utils.getString(props, "autooffset.reset", AutoOffsetReset)
+  val autoOffsetReset = props.getString("autooffset.reset", AutoOffsetReset)
 
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
-  val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
+  val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
 
   /** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
    *  Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
    *  overhead of decompression.
    *  */
-  val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
+  val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala Sat Aug 25 06:23:18 2012
@@ -20,12 +20,11 @@
 
 package kafka.metrics
 
-import java.util.Properties
 import com.yammer.metrics.Metrics
 import java.io.File
 import com.yammer.metrics.reporting.CsvReporter
-import kafka.utils.{Logging, Utils}
 import java.util.concurrent.TimeUnit
+import kafka.utils.{VerifiableProperties, Logging}
 
 
 private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
@@ -43,15 +42,15 @@ private class KafkaCSVMetricsReporter ex
   override def getMBeanName = "kafka:type=kafka.metrics.KafkaCSVMetricsReporter"
 
 
-  override def init(props: Properties) {
+  override def init(props: VerifiableProperties) {
     synchronized {
       if (!initialized) {
         val metricsConfig = new KafkaMetricsConfig(props)
-        csvDir = new File(Utils.getString(props, "kafka.csv.metrics.dir", "kafka_metrics"))
+        csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics"))
         if (!csvDir.exists())
           csvDir.mkdirs()
         underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
-        if (Utils.getBoolean(props, "kafka.csv.metrics.reporter.enabled", false))
+        if (props.getBoolean("kafka.csv.metrics.reporter.enabled", false))
           startReporter(metricsConfig.pollingIntervalSecs)
         initialized = true
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala Sat Aug 25 06:23:18 2012
@@ -20,19 +20,18 @@
 
 package kafka.metrics
 
-import java.util.Properties
-import kafka.utils.Utils
+import kafka.utils.{VerifiableProperties, Utils}
 
-class KafkaMetricsConfig(props: Properties) {
+class KafkaMetricsConfig(props: VerifiableProperties) {
 
   /**
    * Comma-separated list of reporter types. These classes should be on the
    * classpath and will be instantiated at run-time.
    */
-  val reporters = Utils.getCSVList(Utils.getString(props, "kafka.metrics.reporters", ""))
+  val reporters = Utils.getCSVList(props.getString("kafka.metrics.reporters", ""))
 
   /**
    * The metrics polling interval (in seconds).
    */
-  val pollingIntervalSecs = Utils.getInt(props, "kafka.metrics.polling.interval.secs", 10)
+  val pollingIntervalSecs = props.getInt("kafka.metrics.polling.interval.secs", 10)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala Sat Aug 25 06:23:18 2012
@@ -20,7 +20,7 @@
 
 package kafka.metrics
 
-import java.util.Properties
+import kafka.utils.VerifiableProperties
 
 /**
  * Base trait for reporter MBeans. If a client wants to expose these JMX
@@ -42,6 +42,6 @@ trait KafkaMetricsReporterMBean {
 
 
 trait KafkaMetricsReporter {
-  def init(props: Properties)
+  def init(props: VerifiableProperties)
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Sat Aug 25 06:23:18 2012
@@ -19,9 +19,16 @@ package kafka.producer
 
 import async.AsyncProducerConfig
 import java.util.Properties
-import kafka.utils.Utils
+import kafka.utils.{Utils, VerifiableProperties}
+import kafka.message.{CompressionCodec, NoCompressionCodec}
 
-class ProducerConfig(val props: Properties) extends AsyncProducerConfig with SyncProducerConfigShared{
+class ProducerConfig private (val props: VerifiableProperties)
+        extends AsyncProducerConfig with SyncProducerConfigShared {
+
+  def this(originalProps: Properties) {
+    this(new VerifiableProperties(originalProps))
+    props.verify()
+  }
 
   /** This is for bootstrapping and the producer will only use it for getting metadata
    * (topics, partitions and replicas). The socket connections for sending the actual data
@@ -29,27 +36,27 @@ class ProducerConfig(val props: Properti
    * format is host1:por1,host2:port2, and the list can be a subset of brokers or
    * a VIP pointing to a subset of brokers.
    */
-  val brokerList = Utils.getString(props, "broker.list")
+  val brokerList = props.getString("broker.list")
 
   /**
    * If DefaultEventHandler is used, this specifies the number of times to
    * retry if an error is encountered during send.
    */
-  val numRetries = Utils.getInt(props, "num.retries", 0)
+  val numRetries = props.getInt("num.retries", 0)
 
   /** the partitioner class for partitioning events amongst sub-topics */
-  val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
+  val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner")
 
   /** this parameter specifies whether the messages are sent asynchronously *
    * or not. Valid values are - async for asynchronous send                 *
    *                            sync for synchronous send                   */
-  val producerType = Utils.getString(props, "producer.type", "sync")
+  val producerType = props.getString("producer.type", "sync")
 
   /**
    * This parameter allows you to specify the compression codec for all data generated *
    * by this producer. The default is NoCompressionCodec
    */
-  val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")
+  val compressionCodec = CompressionCodec.getCompressionCodec(props.getInt("compression.codec", NoCompressionCodec.codec))
 
   /** This parameter allows you to set whether compression should be turned *
    *  on for particular topics
@@ -62,7 +69,7 @@ class ProducerConfig(val props: Properti
    *
    *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
    */
-  val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
+  val compressedTopics = Utils.getCSVList(props.getString("compressed.topics", null))
 
   /**
    * The producer using the zookeeper software load balancer maintains a ZK cache that gets
@@ -72,7 +79,7 @@ class ProducerConfig(val props: Properti
    * ZK cache needs to be updated.
    * This parameter specifies the number of times the producer attempts to refresh this ZK cache.
    */
-  val producerRetries = Utils.getInt(props, "producer.num.retries", 3)
+  val producerRetries = props.getInt("producer.num.retries", 3)
 
-  val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 100)
+  val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Sat Aug 25 06:23:18 2012
@@ -37,7 +37,7 @@ object ProducerPool{
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
-    props.putAll(config.props)
+    props.putAll(config.props.props)
     new SyncProducer(new SyncProducerConfig(props))
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Sat Aug 25 06:23:18 2012
@@ -17,48 +17,53 @@
 
 package kafka.producer
 
-import kafka.utils.Utils
 import java.util.Properties
+import kafka.utils.VerifiableProperties
+
+class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared {
+  def this(originalProps: Properties) {
+    this(new VerifiableProperties(originalProps))
+    // no need to verify the property since SyncProducerConfig is supposed to be used internally
+  }
 
-class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
   /** the broker to which the producer sends events */
-  val host = Utils.getString(props, "host")
+  val host = props.getString("host")
 
   /** the port on which the broker is running */
-  val port = Utils.getInt(props, "port")
+  val port = props.getInt("port")
 }
 
 trait SyncProducerConfigShared {
-  val props: Properties
+  val props: VerifiableProperties
   
-  val bufferSize = Utils.getInt(props, "buffer.size", 100*1024)
+  val bufferSize = props.getInt("buffer.size", 100*1024)
 
-  val connectTimeoutMs = Utils.getInt(props, "connect.timeout.ms", 5000)
+  val connectTimeoutMs = props.getInt("connect.timeout.ms", 5000)
 
-  val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
+  val reconnectInterval = props.getInt("reconnect.interval", 30000)
 
   /** negative reconnect time interval means disabling this time-based reconnect feature */
-  var reconnectTimeInterval = Utils.getInt(props, "reconnect.time.interval.ms", 1000*1000*10)
+  var reconnectTimeInterval = props.getInt("reconnect.time.interval.ms", 1000*1000*10)
 
-  val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
+  val maxMessageSize = props.getInt("max.message.size", 1000000)
 
   /* the client application sending the producer requests */
-  val correlationId = Utils.getInt(props,"producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
+  val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
 
   /* the client application sending the producer requests */
-  val clientId = Utils.getString(props,"producer.request.client_id",SyncProducerConfig.DefaultClientId)
+  val clientId = props.getString("producer.request.client_id",SyncProducerConfig.DefaultClientId)
 
   /*
    * The required acks of the producer requests - negative value means ack
    * after the replicas in ISR have caught up to the leader's offset
    * corresponding to this produce request.
    */
-  val requiredAcks = Utils.getShort(props,"producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
+  val requiredAcks = props.getShort("producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
 
   /*
    * The ack timeout of the producer requests. Value must be non-negative and non-zero
    */
-  val requestTimeoutMs = Utils.getIntInRange(props,"producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
+  val requestTimeoutMs = props.getIntInRange("producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
                                              (1, Integer.MAX_VALUE))
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala Sat Aug 25 06:23:18 2012
@@ -16,17 +16,16 @@
 */
 package kafka.producer.async
 
-import java.util.Properties
-import kafka.utils.Utils
+import kafka.utils.VerifiableProperties
 
 trait AsyncProducerConfig {
-  val props: Properties
+  val props: VerifiableProperties
 
   /* maximum time, in milliseconds, for buffering data on the producer queue */
-  val queueTime = Utils.getInt(props, "queue.time", 5000)
+  val queueTime = props.getInt("queue.time", 5000)
 
   /** the maximum size of the blocking queue for buffering on the producer */
-  val queueSize = Utils.getInt(props, "queue.size", 10000)
+  val queueSize = props.getInt("queue.size", 10000)
 
   /**
    * Timeout for event enqueue:
@@ -34,11 +33,11 @@ trait AsyncProducerConfig {
    * -ve: enqueue will block indefinitely if the queue is full
    * +ve: enqueue will block up to this many milliseconds if the queue is full
    */
-  val enqueueTimeoutMs = Utils.getInt(props, "queue.enqueueTimeout.ms", 0)
+  val enqueueTimeoutMs = props.getInt("queue.enqueueTimeout.ms", 0)
 
   /** the number of messages batched at the producer */
-  val batchSize = Utils.getInt(props, "batch.size", 200)
+  val batchSize = props.getInt("batch.size", 200)
 
   /** the serializer class for events */
-  val serializerClass = Utils.getString(props, "serializer.class", "kafka.serializer.DefaultEncoder")
+  val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Sat Aug 25 06:23:18 2012
@@ -18,142 +18,147 @@
 package kafka.server
 
 import java.util.Properties
-import kafka.utils.{Utils, ZKConfig}
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
-
-
+import kafka.utils.{Utils, VerifiableProperties, ZKConfig}
 
 /**
  * Configuration settings for the kafka server
  */
-class KafkaConfig(props: Properties) extends ZKConfig(props) {
+class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
+
+  def this(originalProps: Properties) {
+    this(new VerifiableProperties(originalProps))
+  }
+
+  def verify() = props.verify()
+
   /* the port to listen and accept connections on */
-  val port: Int = Utils.getInt(props, "port", 6667)
+  val port: Int = props.getInt("port", 6667)
 
   /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
-  val hostName: String = Utils.getString(props, "hostname", InetAddress.getLocalHost.getHostAddress)
+  val hostName: String = props.getString("hostname", InetAddress.getLocalHost.getHostAddress)
 
   /* the broker id for this server */
-  val brokerId: Int = Utils.getIntInRange(props, "brokerid", (0, Int.MaxValue))
+  val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
 
   /* the SO_SNDBUFF buffer of the socket sever sockets */
-  val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
+  val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
   
   /* the SO_RCVBUFF buffer of the socket sever sockets */
-  val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)
+  val socketReceiveBuffer: Int = props.getInt("socket.receive.buffer", 100*1024)
   
   /* the maximum number of bytes in a socket request */
-  val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+  val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
   
   /* the number of network threads that the server uses for handling network requests */
-  val numNetworkThreads = Utils.getIntInRange(props, "network.threads", 3, (1, Int.MaxValue))
+  val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
 
   /* the number of io threads that the server uses for carrying out network requests */
-  val numIoThreads = Utils.getIntInRange(props, "io.threads", 8, (1, Int.MaxValue))
+  val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
   
   /* the number of queued requests allowed before blocking the network threads */
-  val numQueuedRequests = Utils.getIntInRange(props, "max.queued.requests", 500, (1, Int.MaxValue))
+  val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
 
   /* the interval in which to measure performance statistics */
-  val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
+  val monitoringPeriodSecs = props.getIntInRange("monitoring.period.secs", 600, (1, Int.MaxValue))
   
   /* the default number of log partitions per topic */
-  val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))
+  val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
   
   /* the directory in which the log data is kept */
-  val logDir = Utils.getString(props, "log.dir")
+  val logDir = props.getString("log.dir")
   
   /* the maximum size of a single log file */
-  val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
+  val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
 
   /* the maximum size of a single log file for some specific topic */
-  val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", ""))
+  val logFileSizeMap = Utils.getTopicFileSize(props.getString("topic.log.file.size", ""))
 
   /* the maximum time before a new log segment is rolled out */
-  val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))
+  val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours before rolling out a new log segment for some specific topic */
-  val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))
+  val logRollHoursMap = Utils.getTopicRollHours(props.getString("topic.log.roll.hours", ""))
 
   /* the number of hours to keep a log file before deleting it */
-  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))
+  val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
-  val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
+  val logRetentionHoursMap = Utils.getTopicRetentionHours(props.getString("topic.log.retention.hours", ""))
 
   /* the maximum size of the log before deleting it */
-  val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
+  val logRetentionSize = props.getLong("log.retention.size", -1)
 
   /* the maximum size of the log for some specific topic before deleting it */
-  val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", ""))
+  val logRetentionSizeMap = Utils.getTopicRetentionSize(props.getString("topic.log.retention.size", ""))
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
-  val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+  val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
 
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
-  val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
+  val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
 
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
-  val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
+  val flushIntervalMap = Utils.getTopicFlushIntervals(props.getString("topic.flush.intervals.ms", ""))
 
   /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
-  val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms",  3000)
+  val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms",  3000)
 
   /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
-  val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)
+  val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
 
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
-  val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+  val topicPartitionsMap = Utils.getTopicPartitions(props.getString("topic.partition.count.map", ""))
 
   /* enable auto creation of topic on the server */
-  val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
+  val autoCreateTopics = props.getBoolean("auto.create.topics", true)
 
   /**
    * Following properties are relevant to Kafka replication
    */
 
   /* the socket timeout for controller-to-broker channels */
-  val controllerSocketTimeoutMs = Utils.getInt(props, "controller.socket.timeout.ms", 30000)
+  val controllerSocketTimeoutMs = props.getInt("controller.socket.timeout.ms", 30000)
 
   /* the buffer size for controller-to-broker-channels */
-  val controllerMessageQueueSize= Utils.getInt(props, "controller.message.queue.size", 10)
+  val controllerMessageQueueSize= props.getInt("controller.message.queue.size", 10)
 
 
   /* default replication factors for automatically created topics */
-  val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
+  val defaultReplicationFactor = props.getInt("default.replication.factor", 1)
 
   /* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during
   * leader election on all replicas minus the preferred replica */
-  val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
+  val preferredReplicaWaitTime = props.getLong("preferred.replica.wait.time", 300)
 
-  val replicaMaxLagTimeMs = Utils.getLong(props, "replica.max.lag.time.ms", 10000)
+  val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000)
 
-  val replicaMaxLagBytes = Utils.getLong(props, "replica.max.lag.bytes", 4000)
+  val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)
 
   /* size of the state change request queue in Zookeeper */
-  val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
+  val stateChangeQSize = props.getInt("state.change.queue.size", 1000)
 
   /**
    * Config options relevant to a follower for a replica
    */
   /** the socket timeout for network requests */
-  val replicaSocketTimeoutMs = Utils.getInt(props, "replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
+  val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
 
   /** the socket receive buffer for network requests */
-  val replicaSocketBufferSize = Utils.getInt(props, "replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
+  val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
 
   /** the number of byes of messages to attempt to fetch */
-  val replicaFetchSize = Utils.getInt(props, "replica.fetch.size", ConsumerConfig.FetchSize)
+  val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize)
 
   /** max wait time for each fetcher request issued by follower replicas*/
-  val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500)
+  val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500)
 
   /** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
-  val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4096)
+  val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 4096)
 
   /* number of fetcher threads used to replicate messages from a source broker.
   *  Increasing this value can increase the degree of I/O parallelism in the follower broker. */
-  val numReplicaFetchers = Utils.getInt(props, "replica.fetchers", 1)
+  val numReplicaFetchers = props.getInt("replica.fetchers", 1)
  }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala Sat Aug 25 06:23:18 2012
@@ -32,6 +32,7 @@ class KafkaServerStartable(val serverCon
   def startup() {
     try {
       server.startup()
+      serverConfig.verify()
     }
     catch {
       case e =>

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala Sat Aug 25 06:23:18 2012
@@ -33,10 +33,11 @@ import javax.management.ObjectName
 object Mx4jLoader extends Logging {
 
   def maybeLoad(): Boolean = {
-    if (!Utils.getBoolean(System.getProperties(), "kafka_mx4jenable", false))
+    val props = new VerifiableProperties(System.getProperties())
+    if (props.getBoolean("kafka_mx4jenable", false))
       false
-    val address = System.getProperty("mx4jaddress", "0.0.0.0")
-    val port = Utils.getInt(System.getProperties(), "mx4jport", 8082)
+    val address = props.getString("mx4jaddress", "0.0.0.0")
+    val port = props.getInt("mx4jport", 8082)
     try {
       debug("Will try to load MX4j now, if it's in the classpath");
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Aug 25 06:23:18 2012
@@ -209,62 +209,6 @@ object Utils extends Logging {
     props.load(propStream)
     props
   }
-  
-  /**
-   * Read a required integer property value or throw an exception if no such property is found
-   */
-  def getInt(props: Properties, name: String): Int = {
-    require(props.containsKey(name), "Missing required property '" + name + "'")
-    return getInt(props, name, -1)
-  }
-
-  def getIntInRange(props: Properties, name: String, range: (Int, Int)): Int = {
-    require(props.containsKey(name), "Missing required property '" + name + "'")
-    getIntInRange(props, name, -1, range)
-  }
-
-  /**
-   * Read an integer from the properties instance
-   * @param props The properties to read from
-   * @param name The property name
-   * @param default The default value to use if the property is not found
-   * @return the integer value
-   */
-  def getInt(props: Properties, name: String, default: Int): Int = 
-    getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue))
-  
-  def getShort(props: Properties, name: String, default: Short): Short = 
-    getShortInRange(props, name, default, (Short.MinValue, Short.MaxValue))
-
-  /**
-   * Read an integer from the properties instance. Throw an exception 
-   * if the value is not in the given range (inclusive)
-   * @param props The properties to read from
-   * @param name The property name
-   * @param default The default value to use if the property is not found
-   * @param range The range in which the value must fall (inclusive)
-   * @throws IllegalArgumentException If the value is not in the given range
-   * @return the integer value
-   */
-  def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = {
-    val v = 
-      if(props.containsKey(name))
-        props.getProperty(name).toInt
-      else
-        default
-    require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
-    v
-  }
-
- def getShortInRange(props: Properties, name: String, default: Short, range: (Short, Short)): Short = {
-    val v = 
-      if(props.containsKey(name))
-        props.getProperty(name).toShort
-      else
-        default
-    require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
-    v
-  }
 
   def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
     val value = buffer.getInt
@@ -288,115 +232,6 @@ object Utils extends Logging {
   }
 
   /**
-   * Read a required long property value or throw an exception if no such property is found
-   */
-  def getLong(props: Properties, name: String): Long = {
-    require(props.containsKey(name), "Missing required property '" + name + "'")
-    return getLong(props, name, -1)
-  }
-
-  /**
-   * Read an long from the properties instance
-   * @param props The properties to read from
-   * @param name The property name
-   * @param default The default value to use if the property is not found
-   * @return the long value
-   */
-  def getLong(props: Properties, name: String, default: Long): Long = 
-    getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue))
-
-  /**
-   * Read an long from the properties instance. Throw an exception 
-   * if the value is not in the given range (inclusive)
-   * @param props The properties to read from
-   * @param name The property name
-   * @param default The default value to use if the property is not found
-   * @param range The range in which the value must fall (inclusive)
-   * @throws IllegalArgumentException If the value is not in the given range
-   * @return the long value
-   */
-  def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = {
-    val v = 
-      if(props.containsKey(name))
-        props.getProperty(name).toLong
-      else
-        default
-    require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
-    v
-  }
-
-  /**
-   * Read a boolean value from the properties instance
-   * @param props The properties to read from
-   * @param name The property name
-   * @param default The default value to use if the property is not found
-   * @return the boolean value
-   */
-  def getBoolean(props: Properties, name: String, default: Boolean): Boolean = {
-    if(!props.containsKey(name))
-      default
-    else {
-      val v = props.getProperty(name)
-      require(v == "true" || v == "false", "Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false")
-      v.toBoolean
-    }
-  }
-  
-  /**
-   * Get a string property, or, if no such property is defined, return the given default value
-   */
-  def getString(props: Properties, name: String, default: String): String = {
-    if(props.containsKey(name))
-      props.getProperty(name)
-    else
-      default
-  }
-  
-  /**
-   * Get a string property or throw and exception if no such property is defined.
-   */
-  def getString(props: Properties, name: String): String = {
-    require(props.containsKey(name), "Missing required property '" + name + "'")
-    props.getProperty(name)
-  }
-
-  /**
-   * Get a property of type java.util.Properties or throw and exception if no such property is defined.
-   */
-  def getProps(props: Properties, name: String): Properties = {
-    require(props.containsKey(name), "Missing required property '" + name + "'")
-    val propString = props.getProperty(name)
-    val propValues = propString.split(",")
-    val properties = new Properties
-    for(i <- 0 until propValues.length) {
-      val prop = propValues(i).split("=")
-      require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'")
-      properties.put(prop(0), prop(1))
-    }
-    properties
-  }
-
-  /**
-   * Get a property of type java.util.Properties or return the default if no such property is defined
-   */
-  def getProps(props: Properties, name: String, default: Properties): Properties = {
-    if(props.containsKey(name)) {
-      val propString = props.getProperty(name)
-      val propValues = propString.split(",")
-      require(propValues.length >= 1, "Illegal format of specifying properties '" + propString + "'")
-      val properties = new Properties
-      for(i <- 0 until propValues.length) {
-        val prop = propValues(i).split("=")
-        require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'")
-        properties.put(prop(0), prop(1))
-      }
-      properties
-    }
-    else
-      default
-  }
-
-  /**
    * Open a channel for the given file
    */
   def openChannel(file: File, mutable: Boolean): FileChannel = {
@@ -775,14 +610,6 @@ object Utils extends Logging {
     else true
   }
 
-  def getCompressionCodec(props: Properties, codec: String): CompressionCodec = {
-    val codecValueString = props.getProperty(codec)
-    if(codecValueString == null)
-      NoCompressionCodec
-    else
-      CompressionCodec.getCompressionCodec(codecValueString.toInt)
-  }
-
   def tryCleanupZookeeper(zkUrl: String, groupId: String) {
     try {
       val dir = "/consumers/" + groupId

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala?rev=1377220&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala Sat Aug 25 06:23:18 2012
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.Properties
+import collection.mutable
+
+class VerifiableProperties(val props: Properties) extends Logging {
+  private val referenceSet = mutable.HashSet[String]()
+
+  def containsKey(name: String): Boolean = {
+    props.containsKey(name)
+  }
+
+  def getProperty(name: String): String = {
+    val value = props.getProperty(name)
+    referenceSet.add(name)
+    return value
+  }
+
+  /**
+   * Read a required integer property value or throw an exception if no such property is found
+   */
+  def getInt(name: String): Int = {
+    require(containsKey(name), "Missing required property '" + name + "'")
+    return getInt(name, -1)
+  }
+
+  def getIntInRange(name: String, range: (Int, Int)): Int = {
+    require(containsKey(name), "Missing required property '" + name + "'")
+    getIntInRange(name, -1, range)
+  }
+
+  /**
+   * Read an integer from the properties instance
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @return the integer value
+   */
+  def getInt(name: String, default: Int): Int =
+    getIntInRange(name, default, (Int.MinValue, Int.MaxValue))
+
+  def getShort(name: String, default: Short): Short =
+    getShortInRange(name, default, (Short.MinValue, Short.MaxValue))
+
+  /**
+   * Read an integer from the properties instance. Throw an exception
+   * if the value is not in the given range (inclusive)
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @param range The range in which the value must fall (inclusive)
+   * @throws IllegalArgumentException If the value is not in the given range
+   * @return the integer value
+   */
+  def getIntInRange(name: String, default: Int, range: (Int, Int)): Int = {
+    val v =
+      if(containsKey(name))
+        getProperty(name).toInt
+      else
+        default
+    require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
+    v
+  }
+
+ def getShortInRange(name: String, default: Short, range: (Short, Short)): Short = {
+    val v =
+      if(containsKey(name))
+        getProperty(name).toShort
+      else
+        default
+    require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
+    v
+  }
+
+  /**
+   * Read a required long property value or throw an exception if no such property is found
+   */
+  def getLong(name: String): Long = {
+    require(containsKey(name), "Missing required property '" + name + "'")
+    return getLong(name, -1)
+  }
+
+  /**
+   * Read an long from the properties instance
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @return the long value
+   */
+  def getLong(name: String, default: Long): Long =
+    getLongInRange(name, default, (Long.MinValue, Long.MaxValue))
+
+  /**
+   * Read an long from the properties instance. Throw an exception
+   * if the value is not in the given range (inclusive)
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @param range The range in which the value must fall (inclusive)
+   * @throws IllegalArgumentException If the value is not in the given range
+   * @return the long value
+   */
+  def getLongInRange(name: String, default: Long, range: (Long, Long)): Long = {
+    val v =
+      if(containsKey(name))
+        getProperty(name).toLong
+      else
+        default
+    require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
+    v
+  }
+
+  /**
+   * Read a boolean value from the properties instance
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @return the boolean value
+   */
+  def getBoolean(name: String, default: Boolean): Boolean = {
+    if(!containsKey(name))
+      default
+    else {
+      val v = getProperty(name)
+      require(v == "true" || v == "false", "Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false")
+      v.toBoolean
+    }
+  }
+
+  /**
+   * Get a string property, or, if no such property is defined, return the given default value
+   */
+  def getString(name: String, default: String): String = {
+    if(containsKey(name))
+      getProperty(name)
+    else
+      default
+  }
+
+  /**
+   * Get a string property or throw and exception if no such property is defined.
+   */
+  def getString(name: String): String = {
+    require(containsKey(name), "Missing required property '" + name + "'")
+    getProperty(name)
+  }
+
+  def verify() {
+    info("Verifying properties")
+    val specifiedProperties = props.propertyNames()
+    while (specifiedProperties.hasMoreElements) {
+      val key = specifiedProperties.nextElement().asInstanceOf[String]
+      if (!referenceSet.contains(key))
+        warn("Property %s is not valid".format(key))
+      else
+        info("Property %s is overridden to %s".format(key, props.getProperty(key)))
+    }
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Sat Aug 25 06:23:18 2012
@@ -17,7 +17,6 @@
 
 package kafka.utils
 
-import java.util.Properties
 import kafka.cluster.{Broker, Cluster}
 import kafka.consumer.TopicCount
 import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
@@ -600,16 +599,16 @@ class ZKGroupTopicDirs(group: String, to
 }
 
 
-class ZKConfig(props: Properties) {
+class ZKConfig(props: VerifiableProperties) {
   /** ZK host string */
-  val zkConnect = Utils.getString(props, "zk.connect", null)
+  val zkConnect = props.getString("zk.connect", null)
 
   /** zookeeper session timeout */
-  val zkSessionTimeoutMs = Utils.getInt(props, "zk.sessiontimeout.ms", 6000)
+  val zkSessionTimeoutMs = props.getInt("zk.sessiontimeout.ms", 6000)
 
   /** the max time that the client waits to establish a connection to zookeeper */
-  val zkConnectionTimeoutMs = Utils.getInt(props, "zk.connectiontimeout.ms",zkSessionTimeoutMs)
+  val zkConnectionTimeoutMs = props.getInt("zk.connectiontimeout.ms",zkSessionTimeoutMs)
 
   /** how far a ZK follower can be behind a ZK leader */
-  val zkSyncTimeMs = Utils.getInt(props, "zk.synctime.ms", 2000)
+  val zkSyncTimeMs = props.getInt("zk.synctime.ms", 2000)
 }