You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/25 08:23:19 UTC
svn commit: r1377220 - in /incubator/kafka/branches/0.8: config/
core/src/main/scala/kafka/ core/src/main/scala/kafka/consumer/
core/src/main/scala/kafka/metrics/ core/src/main/scala/kafka/producer/
core/src/main/scala/kafka/producer/async/ core/src/ma...
Author: junrao
Date: Sat Aug 25 06:23:18 2012
New Revision: 1377220
URL: http://svn.apache.org/viewvc?rev=1377220&view=rev
Log:
Log errors for unrecognized config options; patched by Jun Rao; reviewed by Jay Kreps; KAFKA-181
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
Modified:
incubator/kafka/branches/0.8/config/server.properties
incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
Modified: incubator/kafka/branches/0.8/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/server.properties?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/server.properties (original)
+++ incubator/kafka/branches/0.8/config/server.properties Sat Aug 25 06:23:18 2012
@@ -104,9 +104,6 @@ log.cleanup.interval.mins=1
############################# Zookeeper #############################
-# Enable connecting to zookeeper
-enable.zookeeper=true
-
# Zk connection string (see zk docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Sat Aug 25 06:23:18 2012
@@ -33,10 +33,11 @@ object Kafka extends Logging {
try {
val props = Utils.loadProps(args(0))
val serverConfig = new KafkaConfig(props)
- val metricsConfig = new KafkaMetricsConfig(props)
+ val verifiableProps = serverConfig.props
+ val metricsConfig = new KafkaMetricsConfig(verifiableProps)
metricsConfig.reporters.foreach(reporterType => {
val reporter = Utils.getObject[KafkaMetricsReporter](reporterType)
- reporter.init(props)
+ reporter.init(verifiableProps)
if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
})
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Sat Aug 25 06:23:18 2012
@@ -129,7 +129,7 @@ object ConsoleConsumer extends Logging {
props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString)
props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString)
- props.put("auto.commit", "true")
+ props.put("autocommit.enable", "true")
props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
props.put("zk.connect", options.valueOf(zkConnectOpt))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Sat Aug 25 06:23:18 2012
@@ -18,8 +18,9 @@
package kafka.consumer
import java.util.Properties
-import kafka.utils.{ZKConfig, Utils}
import kafka.api.OffsetRequest
+import kafka.utils.{VerifiableProperties, ZKConfig}
+
object ConsumerConfig {
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
@@ -43,62 +44,67 @@ object ConsumerConfig {
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
}
-class ConsumerConfig(props: Properties) extends ZKConfig(props) {
+class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) {
import ConsumerConfig._
+ def this(originalProps: Properties) {
+ this(new VerifiableProperties(originalProps))
+ props.verify()
+ }
+
/** a string that uniquely identifies a set of consumers within the same consumer group */
- val groupId = Utils.getString(props, "groupid")
+ val groupId = props.getString("groupid")
/** consumer id: generated automatically if not set.
* Set this explicitly for only testing purpose. */
- val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null))
+ val consumerId: Option[String] = Option(props.getString("consumerid", null))
/** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
- val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
+ val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
/** the socket receive buffer for network requests */
- val socketBufferSize = Utils.getInt(props, "socket.buffersize", SocketBufferSize)
+ val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
- val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
+ val fetchSize = props.getInt("fetch.size", FetchSize)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
- val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
+ val autoCommit = props.getBoolean("autocommit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */
- val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AutoCommitInterval)
+ val autoCommitIntervalMs = props.getInt("autocommit.interval.ms", AutoCommitInterval)
/** max number of messages buffered for consumption */
- val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
+ val maxQueuedChunks = props.getInt("queuedchunks.max", MaxQueuedChunks)
/** max number of retries during rebalance */
- val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
+ val maxRebalanceRetries = props.getInt("rebalance.retries.max", MaxRebalanceRetries)
/** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
- val minFetchBytes = Utils.getInt(props, "min.fetch.bytes", MinFetchBytes)
+ val minFetchBytes = props.getInt("min.fetch.bytes", MinFetchBytes)
/** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */
- val maxFetchWaitMs = Utils.getInt(props, "max.fetch.wait.ms", MaxFetchWaitMs)
+ val maxFetchWaitMs = props.getInt("max.fetch.wait.ms", MaxFetchWaitMs)
/** backoff time between retries during rebalance */
- val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)
+ val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
/** backoff time to refresh the leader of a partition after it loses the current leader */
- val refreshLeaderBackoffMs = Utils.getInt(props, "refresh.leader.backoff.ms", 200)
+ val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", 200)
/* what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer */
- val autoOffsetReset = Utils.getString(props, "autooffset.reset", AutoOffsetReset)
+ val autoOffsetReset = props.getString("autooffset.reset", AutoOffsetReset)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
- val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
+ val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
/** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
* Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
* overhead of decompression.
* */
- val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
+ val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala Sat Aug 25 06:23:18 2012
@@ -20,12 +20,11 @@
package kafka.metrics
-import java.util.Properties
import com.yammer.metrics.Metrics
import java.io.File
import com.yammer.metrics.reporting.CsvReporter
-import kafka.utils.{Logging, Utils}
import java.util.concurrent.TimeUnit
+import kafka.utils.{VerifiableProperties, Logging}
private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
@@ -43,15 +42,15 @@ private class KafkaCSVMetricsReporter ex
override def getMBeanName = "kafka:type=kafka.metrics.KafkaCSVMetricsReporter"
- override def init(props: Properties) {
+ override def init(props: VerifiableProperties) {
synchronized {
if (!initialized) {
val metricsConfig = new KafkaMetricsConfig(props)
- csvDir = new File(Utils.getString(props, "kafka.csv.metrics.dir", "kafka_metrics"))
+ csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics"))
if (!csvDir.exists())
csvDir.mkdirs()
underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
- if (Utils.getBoolean(props, "kafka.csv.metrics.reporter.enabled", false))
+ if (props.getBoolean("kafka.csv.metrics.reporter.enabled", false))
startReporter(metricsConfig.pollingIntervalSecs)
initialized = true
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala Sat Aug 25 06:23:18 2012
@@ -20,19 +20,18 @@
package kafka.metrics
-import java.util.Properties
-import kafka.utils.Utils
+import kafka.utils.{VerifiableProperties, Utils}
-class KafkaMetricsConfig(props: Properties) {
+class KafkaMetricsConfig(props: VerifiableProperties) {
/**
* Comma-separated list of reporter types. These classes should be on the
* classpath and will be instantiated at run-time.
*/
- val reporters = Utils.getCSVList(Utils.getString(props, "kafka.metrics.reporters", ""))
+ val reporters = Utils.getCSVList(props.getString("kafka.metrics.reporters", ""))
/**
* The metrics polling interval (in seconds).
*/
- val pollingIntervalSecs = Utils.getInt(props, "kafka.metrics.polling.interval.secs", 10)
+ val pollingIntervalSecs = props.getInt("kafka.metrics.polling.interval.secs", 10)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala Sat Aug 25 06:23:18 2012
@@ -20,7 +20,7 @@
package kafka.metrics
-import java.util.Properties
+import kafka.utils.VerifiableProperties
/**
* Base trait for reporter MBeans. If a client wants to expose these JMX
@@ -42,6 +42,6 @@ trait KafkaMetricsReporterMBean {
trait KafkaMetricsReporter {
- def init(props: Properties)
+ def init(props: VerifiableProperties)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Sat Aug 25 06:23:18 2012
@@ -19,9 +19,16 @@ package kafka.producer
import async.AsyncProducerConfig
import java.util.Properties
-import kafka.utils.Utils
+import kafka.utils.{Utils, VerifiableProperties}
+import kafka.message.{CompressionCodec, NoCompressionCodec}
-class ProducerConfig(val props: Properties) extends AsyncProducerConfig with SyncProducerConfigShared{
+class ProducerConfig private (val props: VerifiableProperties)
+ extends AsyncProducerConfig with SyncProducerConfigShared {
+
+ def this(originalProps: Properties) {
+ this(new VerifiableProperties(originalProps))
+ props.verify()
+ }
/** This is for bootstrapping and the producer will only use it for getting metadata
* (topics, partitions and replicas). The socket connections for sending the actual data
@@ -29,27 +36,27 @@ class ProducerConfig(val props: Properti
* format is host1:por1,host2:port2, and the list can be a subset of brokers or
* a VIP pointing to a subset of brokers.
*/
- val brokerList = Utils.getString(props, "broker.list")
+ val brokerList = props.getString("broker.list")
/**
* If DefaultEventHandler is used, this specifies the number of times to
* retry if an error is encountered during send.
*/
- val numRetries = Utils.getInt(props, "num.retries", 0)
+ val numRetries = props.getInt("num.retries", 0)
/** the partitioner class for partitioning events amongst sub-topics */
- val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
+ val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner")
/** this parameter specifies whether the messages are sent asynchronously *
* or not. Valid values are - async for asynchronous send *
* sync for synchronous send */
- val producerType = Utils.getString(props, "producer.type", "sync")
+ val producerType = props.getString("producer.type", "sync")
/**
* This parameter allows you to specify the compression codec for all data generated *
* by this producer. The default is NoCompressionCodec
*/
- val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")
+ val compressionCodec = CompressionCodec.getCompressionCodec(props.getInt("compression.codec", NoCompressionCodec.codec))
/** This parameter allows you to set whether compression should be turned *
* on for particular topics
@@ -62,7 +69,7 @@ class ProducerConfig(val props: Properti
*
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
- val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
+ val compressedTopics = Utils.getCSVList(props.getString("compressed.topics", null))
/**
* The producer using the zookeeper software load balancer maintains a ZK cache that gets
@@ -72,7 +79,7 @@ class ProducerConfig(val props: Properti
* ZK cache needs to be updated.
* This parameter specifies the number of times the producer attempts to refresh this ZK cache.
*/
- val producerRetries = Utils.getInt(props, "producer.num.retries", 3)
+ val producerRetries = props.getInt("producer.num.retries", 3)
- val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 100)
+ val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Sat Aug 25 06:23:18 2012
@@ -37,7 +37,7 @@ object ProducerPool{
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
- props.putAll(config.props)
+ props.putAll(config.props.props)
new SyncProducer(new SyncProducerConfig(props))
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Sat Aug 25 06:23:18 2012
@@ -17,48 +17,53 @@
package kafka.producer
-import kafka.utils.Utils
import java.util.Properties
+import kafka.utils.VerifiableProperties
+
+class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared {
+ def this(originalProps: Properties) {
+ this(new VerifiableProperties(originalProps))
+ // no need to verify the property since SyncProducerConfig is supposed to be used internally
+ }
-class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
/** the broker to which the producer sends events */
- val host = Utils.getString(props, "host")
+ val host = props.getString("host")
/** the port on which the broker is running */
- val port = Utils.getInt(props, "port")
+ val port = props.getInt("port")
}
trait SyncProducerConfigShared {
- val props: Properties
+ val props: VerifiableProperties
- val bufferSize = Utils.getInt(props, "buffer.size", 100*1024)
+ val bufferSize = props.getInt("buffer.size", 100*1024)
- val connectTimeoutMs = Utils.getInt(props, "connect.timeout.ms", 5000)
+ val connectTimeoutMs = props.getInt("connect.timeout.ms", 5000)
- val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
+ val reconnectInterval = props.getInt("reconnect.interval", 30000)
/** negative reconnect time interval means disabling this time-based reconnect feature */
- var reconnectTimeInterval = Utils.getInt(props, "reconnect.time.interval.ms", 1000*1000*10)
+ var reconnectTimeInterval = props.getInt("reconnect.time.interval.ms", 1000*1000*10)
- val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
+ val maxMessageSize = props.getInt("max.message.size", 1000000)
/* the client application sending the producer requests */
- val correlationId = Utils.getInt(props,"producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
+ val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
/* the client application sending the producer requests */
- val clientId = Utils.getString(props,"producer.request.client_id",SyncProducerConfig.DefaultClientId)
+ val clientId = props.getString("producer.request.client_id",SyncProducerConfig.DefaultClientId)
/*
* The required acks of the producer requests - negative value means ack
* after the replicas in ISR have caught up to the leader's offset
* corresponding to this produce request.
*/
- val requiredAcks = Utils.getShort(props,"producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
+ val requiredAcks = props.getShort("producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
/*
* The ack timeout of the producer requests. Value must be non-negative and non-zero
*/
- val requestTimeoutMs = Utils.getIntInRange(props,"producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
+ val requestTimeoutMs = props.getIntInRange("producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
(1, Integer.MAX_VALUE))
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala Sat Aug 25 06:23:18 2012
@@ -16,17 +16,16 @@
*/
package kafka.producer.async
-import java.util.Properties
-import kafka.utils.Utils
+import kafka.utils.VerifiableProperties
trait AsyncProducerConfig {
- val props: Properties
+ val props: VerifiableProperties
/* maximum time, in milliseconds, for buffering data on the producer queue */
- val queueTime = Utils.getInt(props, "queue.time", 5000)
+ val queueTime = props.getInt("queue.time", 5000)
/** the maximum size of the blocking queue for buffering on the producer */
- val queueSize = Utils.getInt(props, "queue.size", 10000)
+ val queueSize = props.getInt("queue.size", 10000)
/**
* Timeout for event enqueue:
@@ -34,11 +33,11 @@ trait AsyncProducerConfig {
* -ve: enqueue will block indefinitely if the queue is full
* +ve: enqueue will block up to this many milliseconds if the queue is full
*/
- val enqueueTimeoutMs = Utils.getInt(props, "queue.enqueueTimeout.ms", 0)
+ val enqueueTimeoutMs = props.getInt("queue.enqueueTimeout.ms", 0)
/** the number of messages batched at the producer */
- val batchSize = Utils.getInt(props, "batch.size", 200)
+ val batchSize = props.getInt("batch.size", 200)
/** the serializer class for events */
- val serializerClass = Utils.getString(props, "serializer.class", "kafka.serializer.DefaultEncoder")
+ val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Sat Aug 25 06:23:18 2012
@@ -18,142 +18,147 @@
package kafka.server
import java.util.Properties
-import kafka.utils.{Utils, ZKConfig}
import kafka.message.Message
import kafka.consumer.ConsumerConfig
import java.net.InetAddress
-
-
+import kafka.utils.{Utils, VerifiableProperties, ZKConfig}
/**
* Configuration settings for the kafka server
*/
-class KafkaConfig(props: Properties) extends ZKConfig(props) {
+class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
+
+ def this(originalProps: Properties) {
+ this(new VerifiableProperties(originalProps))
+ }
+
+ def verify() = props.verify()
+
/* the port to listen and accept connections on */
- val port: Int = Utils.getInt(props, "port", 6667)
+ val port: Int = props.getInt("port", 6667)
/* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
- val hostName: String = Utils.getString(props, "hostname", InetAddress.getLocalHost.getHostAddress)
+ val hostName: String = props.getString("hostname", InetAddress.getLocalHost.getHostAddress)
/* the broker id for this server */
- val brokerId: Int = Utils.getIntInRange(props, "brokerid", (0, Int.MaxValue))
+ val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
/* the SO_SNDBUFF buffer of the socket sever sockets */
- val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
+ val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
/* the SO_RCVBUFF buffer of the socket sever sockets */
- val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)
+ val socketReceiveBuffer: Int = props.getInt("socket.receive.buffer", 100*1024)
/* the maximum number of bytes in a socket request */
- val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+ val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
/* the number of network threads that the server uses for handling network requests */
- val numNetworkThreads = Utils.getIntInRange(props, "network.threads", 3, (1, Int.MaxValue))
+ val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
/* the number of io threads that the server uses for carrying out network requests */
- val numIoThreads = Utils.getIntInRange(props, "io.threads", 8, (1, Int.MaxValue))
+ val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
/* the number of queued requests allowed before blocking the network threads */
- val numQueuedRequests = Utils.getIntInRange(props, "max.queued.requests", 500, (1, Int.MaxValue))
+ val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
/* the interval in which to measure performance statistics */
- val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
+ val monitoringPeriodSecs = props.getIntInRange("monitoring.period.secs", 600, (1, Int.MaxValue))
/* the default number of log partitions per topic */
- val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))
+ val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
/* the directory in which the log data is kept */
- val logDir = Utils.getString(props, "log.dir")
+ val logDir = props.getString("log.dir")
/* the maximum size of a single log file */
- val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
+ val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
/* the maximum size of a single log file for some specific topic */
- val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", ""))
+ val logFileSizeMap = Utils.getTopicFileSize(props.getString("topic.log.file.size", ""))
/* the maximum time before a new log segment is rolled out */
- val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))
+ val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
/* the number of hours before rolling out a new log segment for some specific topic */
- val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))
+ val logRollHoursMap = Utils.getTopicRollHours(props.getString("topic.log.roll.hours", ""))
/* the number of hours to keep a log file before deleting it */
- val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))
+ val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
/* the number of hours to keep a log file before deleting it for some specific topic*/
- val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
+ val logRetentionHoursMap = Utils.getTopicRetentionHours(props.getString("topic.log.retention.hours", ""))
/* the maximum size of the log before deleting it */
- val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
+ val logRetentionSize = props.getLong("log.retention.size", -1)
/* the maximum size of the log for some specific topic before deleting it */
- val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", ""))
+ val logRetentionSizeMap = Utils.getTopicRetentionSize(props.getString("topic.log.retention.size", ""))
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
- val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+ val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
/* the number of messages accumulated on a log partition before messages are flushed to disk */
- val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
+ val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
- val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
+ val flushIntervalMap = Utils.getTopicFlushIntervals(props.getString("topic.flush.intervals.ms", ""))
/* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
- val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms", 3000)
+ val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000)
/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
- val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)
+ val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
/* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
- val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+ val topicPartitionsMap = Utils.getTopicPartitions(props.getString("topic.partition.count.map", ""))
/* enable auto creation of topic on the server */
- val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
+ val autoCreateTopics = props.getBoolean("auto.create.topics", true)
/**
* Following properties are relevant to Kafka replication
*/
/* the socket timeout for controller-to-broker channels */
- val controllerSocketTimeoutMs = Utils.getInt(props, "controller.socket.timeout.ms", 30000)
+ val controllerSocketTimeoutMs = props.getInt("controller.socket.timeout.ms", 30000)
/* the buffer size for controller-to-broker-channels */
- val controllerMessageQueueSize= Utils.getInt(props, "controller.message.queue.size", 10)
+ val controllerMessageQueueSize= props.getInt("controller.message.queue.size", 10)
/* default replication factors for automatically created topics */
- val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
+ val defaultReplicationFactor = props.getInt("default.replication.factor", 1)
/* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during
* leader election on all replicas minus the preferred replica */
- val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
+ val preferredReplicaWaitTime = props.getLong("preferred.replica.wait.time", 300)
- val replicaMaxLagTimeMs = Utils.getLong(props, "replica.max.lag.time.ms", 10000)
+ val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000)
- val replicaMaxLagBytes = Utils.getLong(props, "replica.max.lag.bytes", 4000)
+ val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)
/* size of the state change request queue in Zookeeper */
- val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
+ val stateChangeQSize = props.getInt("state.change.queue.size", 1000)
/**
* Config options relevant to a follower for a replica
*/
/** the socket timeout for network requests */
- val replicaSocketTimeoutMs = Utils.getInt(props, "replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
+ val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
/** the socket receive buffer for network requests */
- val replicaSocketBufferSize = Utils.getInt(props, "replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
+ val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
- val replicaFetchSize = Utils.getInt(props, "replica.fetch.size", ConsumerConfig.FetchSize)
+ val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize)
/** max wait time for each fetcher request issued by follower replicas*/
- val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500)
+ val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500)
/** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
- val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4096)
+ val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 4096)
/* number of fetcher threads used to replicate messages from a source broker.
* Increasing this value can increase the degree of I/O parallelism in the follower broker. */
- val numReplicaFetchers = Utils.getInt(props, "replica.fetchers", 1)
+ val numReplicaFetchers = props.getInt("replica.fetchers", 1)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala Sat Aug 25 06:23:18 2012
@@ -32,6 +32,7 @@ class KafkaServerStartable(val serverCon
def startup() {
try {
server.startup()
+ serverConfig.verify()
}
catch {
case e =>
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Mx4jLoader.scala Sat Aug 25 06:23:18 2012
@@ -33,10 +33,11 @@ import javax.management.ObjectName
object Mx4jLoader extends Logging {
def maybeLoad(): Boolean = {
- if (!Utils.getBoolean(System.getProperties(), "kafka_mx4jenable", false))
+ val props = new VerifiableProperties(System.getProperties())
+ if (props.getBoolean("kafka_mx4jenable", false))
false
- val address = System.getProperty("mx4jaddress", "0.0.0.0")
- val port = Utils.getInt(System.getProperties(), "mx4jport", 8082)
+ val address = props.getString("mx4jaddress", "0.0.0.0")
+ val port = props.getInt("mx4jport", 8082)
try {
debug("Will try to load MX4j now, if it's in the classpath");
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Aug 25 06:23:18 2012
@@ -209,62 +209,6 @@ object Utils extends Logging {
props.load(propStream)
props
}
-
- /**
- * Read a required integer property value or throw an exception if no such property is found
- */
- def getInt(props: Properties, name: String): Int = {
- require(props.containsKey(name), "Missing required property '" + name + "'")
- return getInt(props, name, -1)
- }
-
- def getIntInRange(props: Properties, name: String, range: (Int, Int)): Int = {
- require(props.containsKey(name), "Missing required property '" + name + "'")
- getIntInRange(props, name, -1, range)
- }
-
- /**
- * Read an integer from the properties instance
- * @param props The properties to read from
- * @param name The property name
- * @param default The default value to use if the property is not found
- * @return the integer value
- */
- def getInt(props: Properties, name: String, default: Int): Int =
- getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue))
-
- def getShort(props: Properties, name: String, default: Short): Short =
- getShortInRange(props, name, default, (Short.MinValue, Short.MaxValue))
-
- /**
- * Read an integer from the properties instance. Throw an exception
- * if the value is not in the given range (inclusive)
- * @param props The properties to read from
- * @param name The property name
- * @param default The default value to use if the property is not found
- * @param range The range in which the value must fall (inclusive)
- * @throws IllegalArgumentException If the value is not in the given range
- * @return the integer value
- */
- def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = {
- val v =
- if(props.containsKey(name))
- props.getProperty(name).toInt
- else
- default
- require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
- v
- }
-
- def getShortInRange(props: Properties, name: String, default: Short, range: (Short, Short)): Short = {
- val v =
- if(props.containsKey(name))
- props.getProperty(name).toShort
- else
- default
- require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
- v
- }
def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
val value = buffer.getInt
@@ -288,115 +232,6 @@ object Utils extends Logging {
}
/**
- * Read a required long property value or throw an exception if no such property is found
- */
- def getLong(props: Properties, name: String): Long = {
- require(props.containsKey(name), "Missing required property '" + name + "'")
- return getLong(props, name, -1)
- }
-
- /**
- * Read an long from the properties instance
- * @param props The properties to read from
- * @param name The property name
- * @param default The default value to use if the property is not found
- * @return the long value
- */
- def getLong(props: Properties, name: String, default: Long): Long =
- getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue))
-
- /**
- * Read an long from the properties instance. Throw an exception
- * if the value is not in the given range (inclusive)
- * @param props The properties to read from
- * @param name The property name
- * @param default The default value to use if the property is not found
- * @param range The range in which the value must fall (inclusive)
- * @throws IllegalArgumentException If the value is not in the given range
- * @return the long value
- */
- def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = {
- val v =
- if(props.containsKey(name))
- props.getProperty(name).toLong
- else
- default
- require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
- v
- }
-
- /**
- * Read a boolean value from the properties instance
- * @param props The properties to read from
- * @param name The property name
- * @param default The default value to use if the property is not found
- * @return the boolean value
- */
- def getBoolean(props: Properties, name: String, default: Boolean): Boolean = {
- if(!props.containsKey(name))
- default
- else {
- val v = props.getProperty(name)
- require(v == "true" || v == "false", "Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false")
- v.toBoolean
- }
- }
-
- /**
- * Get a string property, or, if no such property is defined, return the given default value
- */
- def getString(props: Properties, name: String, default: String): String = {
- if(props.containsKey(name))
- props.getProperty(name)
- else
- default
- }
-
- /**
- * Get a string property or throw and exception if no such property is defined.
- */
- def getString(props: Properties, name: String): String = {
- require(props.containsKey(name), "Missing required property '" + name + "'")
- props.getProperty(name)
- }
-
- /**
- * Get a property of type java.util.Properties or throw and exception if no such property is defined.
- */
- def getProps(props: Properties, name: String): Properties = {
- require(props.containsKey(name), "Missing required property '" + name + "'")
- val propString = props.getProperty(name)
- val propValues = propString.split(",")
- val properties = new Properties
- for(i <- 0 until propValues.length) {
- val prop = propValues(i).split("=")
- require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'")
- properties.put(prop(0), prop(1))
- }
- properties
- }
-
- /**
- * Get a property of type java.util.Properties or return the default if no such property is defined
- */
- def getProps(props: Properties, name: String, default: Properties): Properties = {
- if(props.containsKey(name)) {
- val propString = props.getProperty(name)
- val propValues = propString.split(",")
- require(propValues.length >= 1, "Illegal format of specifying properties '" + propString + "'")
- val properties = new Properties
- for(i <- 0 until propValues.length) {
- val prop = propValues(i).split("=")
- require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'")
- properties.put(prop(0), prop(1))
- }
- properties
- }
- else
- default
- }
-
- /**
* Open a channel for the given file
*/
def openChannel(file: File, mutable: Boolean): FileChannel = {
@@ -775,14 +610,6 @@ object Utils extends Logging {
else true
}
- def getCompressionCodec(props: Properties, codec: String): CompressionCodec = {
- val codecValueString = props.getProperty(codec)
- if(codecValueString == null)
- NoCompressionCodec
- else
- CompressionCodec.getCompressionCodec(codecValueString.toInt)
- }
-
def tryCleanupZookeeper(zkUrl: String, groupId: String) {
try {
val dir = "/consumers/" + groupId
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala?rev=1377220&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala Sat Aug 25 06:23:18 2012
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.Properties
+import collection.mutable
+
+class VerifiableProperties(val props: Properties) extends Logging {
+ private val referenceSet = mutable.HashSet[String]()
+
+ def containsKey(name: String): Boolean = {
+ props.containsKey(name)
+ }
+
+ def getProperty(name: String): String = {
+ val value = props.getProperty(name)
+ referenceSet.add(name)
+ return value
+ }
+
+ /**
+ * Read a required integer property value or throw an exception if no such property is found
+ */
+ def getInt(name: String): Int = {
+ require(containsKey(name), "Missing required property '" + name + "'")
+ return getInt(name, -1)
+ }
+
+ def getIntInRange(name: String, range: (Int, Int)): Int = {
+ require(containsKey(name), "Missing required property '" + name + "'")
+ getIntInRange(name, -1, range)
+ }
+
+ /**
+ * Read an integer from the properties instance
+ * @param name The property name
+ * @param default The default value to use if the property is not found
+ * @return the integer value
+ */
+ def getInt(name: String, default: Int): Int =
+ getIntInRange(name, default, (Int.MinValue, Int.MaxValue))
+
+ def getShort(name: String, default: Short): Short =
+ getShortInRange(name, default, (Short.MinValue, Short.MaxValue))
+
+ /**
+ * Read an integer from the properties instance. Throw an exception
+ * if the value is not in the given range (inclusive)
+ * @param name The property name
+ * @param default The default value to use if the property is not found
+ * @param range The range in which the value must fall (inclusive)
+ * @throws IllegalArgumentException If the value is not in the given range
+ * @return the integer value
+ */
+ def getIntInRange(name: String, default: Int, range: (Int, Int)): Int = {
+ val v =
+ if(containsKey(name))
+ getProperty(name).toInt
+ else
+ default
+ require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
+ v
+ }
+
+ def getShortInRange(name: String, default: Short, range: (Short, Short)): Short = {
+ val v =
+ if(containsKey(name))
+ getProperty(name).toShort
+ else
+ default
+ require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
+ v
+ }
+
+ /**
+ * Read a required long property value or throw an exception if no such property is found
+ */
+ def getLong(name: String): Long = {
+ require(containsKey(name), "Missing required property '" + name + "'")
+ return getLong(name, -1)
+ }
+
+ /**
+ * Read an long from the properties instance
+ * @param name The property name
+ * @param default The default value to use if the property is not found
+ * @return the long value
+ */
+ def getLong(name: String, default: Long): Long =
+ getLongInRange(name, default, (Long.MinValue, Long.MaxValue))
+
+ /**
+ * Read an long from the properties instance. Throw an exception
+ * if the value is not in the given range (inclusive)
+ * @param name The property name
+ * @param default The default value to use if the property is not found
+ * @param range The range in which the value must fall (inclusive)
+ * @throws IllegalArgumentException If the value is not in the given range
+ * @return the long value
+ */
+ def getLongInRange(name: String, default: Long, range: (Long, Long)): Long = {
+ val v =
+ if(containsKey(name))
+ getProperty(name).toLong
+ else
+ default
+ require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
+ v
+ }
+
+ /**
+ * Read a boolean value from the properties instance
+ * @param name The property name
+ * @param default The default value to use if the property is not found
+ * @return the boolean value
+ */
+ def getBoolean(name: String, default: Boolean): Boolean = {
+ if(!containsKey(name))
+ default
+ else {
+ val v = getProperty(name)
+ require(v == "true" || v == "false", "Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false")
+ v.toBoolean
+ }
+ }
+
+ /**
+ * Get a string property, or, if no such property is defined, return the given default value
+ */
+ def getString(name: String, default: String): String = {
+ if(containsKey(name))
+ getProperty(name)
+ else
+ default
+ }
+
+ /**
+ * Get a string property or throw and exception if no such property is defined.
+ */
+ def getString(name: String): String = {
+ require(containsKey(name), "Missing required property '" + name + "'")
+ getProperty(name)
+ }
+
+ def verify() {
+ info("Verifying properties")
+ val specifiedProperties = props.propertyNames()
+ while (specifiedProperties.hasMoreElements) {
+ val key = specifiedProperties.nextElement().asInstanceOf[String]
+ if (!referenceSet.contains(key))
+ warn("Property %s is not valid".format(key))
+ else
+ info("Property %s is overridden to %s".format(key, props.getProperty(key)))
+ }
+ }
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1377220&r1=1377219&r2=1377220&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Sat Aug 25 06:23:18 2012
@@ -17,7 +17,6 @@
package kafka.utils
-import java.util.Properties
import kafka.cluster.{Broker, Cluster}
import kafka.consumer.TopicCount
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
@@ -600,16 +599,16 @@ class ZKGroupTopicDirs(group: String, to
}
-class ZKConfig(props: Properties) {
+class ZKConfig(props: VerifiableProperties) {
/** ZK host string */
- val zkConnect = Utils.getString(props, "zk.connect", null)
+ val zkConnect = props.getString("zk.connect", null)
/** zookeeper session timeout */
- val zkSessionTimeoutMs = Utils.getInt(props, "zk.sessiontimeout.ms", 6000)
+ val zkSessionTimeoutMs = props.getInt("zk.sessiontimeout.ms", 6000)
/** the max time that the client waits to establish a connection to zookeeper */
- val zkConnectionTimeoutMs = Utils.getInt(props, "zk.connectiontimeout.ms",zkSessionTimeoutMs)
+ val zkConnectionTimeoutMs = props.getInt("zk.connectiontimeout.ms",zkSessionTimeoutMs)
/** how far a ZK follower can be behind a ZK leader */
- val zkSyncTimeMs = Utils.getInt(props, "zk.synctime.ms", 2000)
+ val zkSyncTimeMs = props.getInt("zk.synctime.ms", 2000)
}