You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC
svn commit: r1152970 [20/26] - in /incubator/kafka: branches/ site/ trunk/
trunk/bin/ trunk/clients/ trunk/clients/clojure/
trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/
trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import kafka.utils.Utils
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.AtomicLong
+import kafka.producer._
+import async.DefaultEventHandler
+import kafka.serializer.StringEncoder
+import org.apache.log4j.Logger
+import joptsimple.{OptionSet, OptionParser}
+import java.util.{Random, Properties}
+import kafka.message.{CompressionCodec, Message, ByteBufferMessageSet}
+
+/**
+ * Load test for the producer
+ */
+object ProducerPerformance {
+
+ def main(args: Array[String]) {
+
+ val logger = Logger.getLogger(getClass)
+ val config = new PerfConfig(args)
+ if(!config.isFixSize)
+ logger.info("WARN: Throughput will be slower due to changing message size per request")
+
+ val totalBytesSent = new AtomicLong(0)
+ val totalMessagesSent = new AtomicLong(0)
+ val executor = Executors.newFixedThreadPool(config.numThreads)
+ val allDone = new CountDownLatch(config.numThreads)
+ val startMs = System.currentTimeMillis
+ val rand = new java.util.Random
+
+ for(i <- 0 until config.numThreads) {
+ if(config.isAsync)
+ executor.execute(new AsyncProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
+ else
+ executor.execute(new SyncProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
+ }
+
+ allDone.await()
+ val elapsedSecs = (System.currentTimeMillis - startMs) / 1000.0
+ logger.info("Total Num Messages: " + totalMessagesSent.get + " bytes: " + totalBytesSent.get + " in " + elapsedSecs + " secs")
+ logger.info("Messages/sec: " + (1.0 * totalMessagesSent.get / elapsedSecs).formatted("%.4f"))
+ logger.info("MB/sec: " + (totalBytesSent.get / elapsedSecs / (1024.0*1024.0)).formatted("%.4f"))
+ System.exit(0)
+ }
+
+ class PerfConfig(args: Array[String]) {
+ val parser = new OptionParser
+ val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
+ .withRequiredArg
+ .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
+ .ofType(classOf[String])
+ val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send.")
+ .withRequiredArg
+ .describedAs("count")
+ .ofType(classOf[java.lang.Integer])
+ val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(100)
+ val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
+ val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
+ val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2 batch sends.")
+ .withRequiredArg
+ .describedAs("ms")
+ .ofType(classOf[java.lang.Long])
+ .defaultsTo(0)
+ val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(200)
+ val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
+ .withRequiredArg
+ .describedAs("count")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(10)
+ val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.")
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(5000)
+ val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
+ .withRequiredArg
+ .describedAs("compression codec ")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(0)
+
+ val options = parser.parse(args : _*)
+ for(arg <- List(brokerInfoOpt, topicOpt, numMessagesOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+ val brokerInfo = options.valueOf(brokerInfoOpt)
+ val numMessages = options.valueOf(numMessagesOpt).intValue
+ val messageSize = options.valueOf(messageSizeOpt).intValue
+ val isFixSize = !options.has(varyMessageSizeOpt)
+ val isAsync = options.has(asyncOpt)
+ val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
+ var batchSize = options.valueOf(batchSizeOpt).intValue
+ val numThreads = options.valueOf(numThreadsOpt).intValue
+ val topic = options.valueOf(topicOpt)
+ val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+ val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+ }
+
+ private def getStringOfLength(len: Int) : String = {
+ val strArray = new Array[Char](len)
+ for (i <- 0 until len)
+ strArray(i) = 'x'
+ return new String(strArray)
+ }
+
+ class AsyncProducerThread(val threadId: Int,
+ val config: PerfConfig,
+ val totalBytesSent: AtomicLong,
+ val totalMessagesSent: AtomicLong,
+ val allDone: CountDownLatch,
+ val rand: Random) extends Runnable {
+ val logger = Logger.getLogger(getClass)
+ val brokerInfoList = config.brokerInfo.split("=")
+ val props = new Properties()
+ if (brokerInfoList(0) == "zk.connect")
+ props.put("zk.connect", brokerInfoList(1))
+ else
+ props.put("broker.list", brokerInfoList(1))
+
+ props.put("compression.codec", config.compressionCodec.codec.toString)
+ props.put("producer.type","async")
+ props.put("batch.size", config.batchSize.toString)
+ props.put("reconnect.interval", Integer.MAX_VALUE.toString)
+ props.put("buffer.size", (64*1024).toString)
+
+ logger.info("Producer properties = " + props.toString)
+
+ val producerConfig = new ProducerConfig(props)
+ val producer = new Producer[String, String](producerConfig, new StringEncoder,
+ new DefaultEventHandler[String](producerConfig, null), null, new DefaultPartitioner[String])
+
+ override def run {
+ var bytesSent = 0L
+ var lastBytesSent = 0L
+ var nSends = 0
+ var lastNSends = 0
+ var message = getStringOfLength(config.messageSize)
+ var reportTime = System.currentTimeMillis()
+ var lastReportTime = reportTime
+ val messagesPerThread = config.numMessages / config.numThreads
+ logger.info("Messages per thread = " + messagesPerThread)
+ for(j <- 0 until messagesPerThread) {
+ var strLength = config.messageSize
+ if (!config.isFixSize) {
+ strLength = rand.nextInt(config.messageSize)
+ message = getStringOfLength(strLength)
+ bytesSent += strLength
+ }else
+ bytesSent += config.messageSize
+ try {
+ producer.send(new ProducerData[String,String](config.topic, message))
+ if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize == 0)
+ Thread.sleep(config.delayedMSBtwSend)
+ nSends += 1
+ }catch {
+ case e: Exception => e.printStackTrace
+ }
+ if(nSends % config.reportingInterval == 0) {
+ reportTime = System.currentTimeMillis()
+ logger.info("thread " + threadId + ": " + nSends + " messages sent "
+ + (1000.0 * (nSends - lastNSends) / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec "
+ + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec")
+ lastReportTime = reportTime
+ lastBytesSent = bytesSent
+ lastNSends = nSends
+ }
+ }
+ producer.close()
+ totalBytesSent.addAndGet(bytesSent)
+ totalMessagesSent.addAndGet(nSends)
+ allDone.countDown()
+ }
+ }
+
+ class SyncProducerThread(val threadId: Int,
+ val config: PerfConfig,
+ val totalBytesSent: AtomicLong,
+ val totalMessagesSent: AtomicLong,
+ val allDone: CountDownLatch,
+ val rand: Random) extends Runnable {
+ val logger = Logger.getLogger(getClass)
+ val props = new Properties()
+ val brokerInfoList = config.brokerInfo.split("=")
+ if (brokerInfoList(0) == "zk.connect")
+ props.put("zk.connect", brokerInfoList(1))
+ else
+ props.put("broker.list", brokerInfoList(1))
+ props.put("compression.codec", config.compressionCodec.toString)
+ props.put("reconnect.interval", Integer.MAX_VALUE.toString)
+ props.put("buffer.size", (64*1024).toString)
+
+ val producerConfig = new ProducerConfig(props)
+ val producer = new Producer[String, String](producerConfig, new StringEncoder,
+ new DefaultEventHandler[String](producerConfig, null), null, new DefaultPartitioner[String])
+
+ override def run {
+ var bytesSent = 0L
+ var lastBytesSent = 0L
+ var nSends = 0
+ var lastNSends = 0
+ val message = getStringOfLength(config.messageSize)
+ var reportTime = System.currentTimeMillis()
+ var lastReportTime = reportTime
+ val messagesPerThread = config.numMessages / config.numThreads / config.batchSize
+ logger.info("Messages per thread = " + messagesPerThread)
+ var messageSet: List[String] = Nil
+ for(k <- 0 until config.batchSize) {
+ messageSet ::= message
+ }
+ for(j <- 0 until messagesPerThread) {
+ var strLength = config.messageSize
+ if (!config.isFixSize) {
+ for(k <- 0 until config.batchSize) {
+ strLength = rand.nextInt(config.messageSize)
+ messageSet ::= getStringOfLength(strLength)
+ bytesSent += strLength
+ }
+ }else
+ bytesSent += config.batchSize*config.messageSize
+ try {
+ producer.send(new ProducerData[String,String](config.topic, messageSet))
+ if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize == 0)
+ Thread.sleep(config.delayedMSBtwSend)
+ nSends += 1
+ }catch {
+ case e: Exception => e.printStackTrace
+ }
+ if(nSends % config.reportingInterval == 0) {
+ reportTime = System.currentTimeMillis()
+ logger.info("thread " + threadId + ": " + nSends + " messages sent "
+ + (1000.0 * (nSends - lastNSends) * config.batchSize / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec "
+ + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec")
+ lastReportTime = reportTime
+ lastBytesSent = bytesSent
+ lastNSends = nSends
+ }
+ }
+ producer.close()
+ totalBytesSent.addAndGet(bytesSent)
+ totalMessagesSent.addAndGet(nSends*config.batchSize)
+ allDone.countDown()
+ }
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.net.URI
+import java.io._
+import joptsimple._
+import kafka.message._
+import kafka.producer._
+import java.util.Properties
+
+/**
+ * Interactive shell for producing messages from the command line
+ */
+object ProducerShell {
+
+ def main(args: Array[String]) {
+
+ val parser = new OptionParser
+ val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
+ .withRequiredArg
+ .describedAs("kafka://hostname:port")
+ .ofType(classOf[String])
+ val topicOpt = parser.accepts("topic", "REQUIRED: The topic to produce to.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+
+ val options = parser.parse(args : _*)
+
+ for(arg <- List(urlOpt, topicOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val url = new URI(options.valueOf(urlOpt))
+ val topic = options.valueOf(topicOpt)
+ val props = new Properties()
+ props.put("host", url.getHost)
+ props.put("port", url.getPort.toString)
+ props.put("buffer.size", "65536")
+ props.put("connect.timeout.ms", "10000")
+ props.put("reconnect.interval", "100")
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+
+ val input = new BufferedReader(new InputStreamReader(System.in))
+ var done = false
+ while(!done) {
+ val line = input.readLine()
+ if(line == null) {
+ done = true
+ } else {
+ val lineBytes = line.trim.getBytes()
+ val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(lineBytes))
+ producer.send(topic, messageList)
+ println("Sent: %d (%ld) bytes)".format(line, messageList.sizeInBytes))
+ }
+ }
+ producer.close()
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,197 @@
+package kafka.tools
+
+import java.io.File
+import joptsimple.OptionParser
+import org.apache.log4j.Logger
+import java.util.concurrent.{Executors, CountDownLatch}
+import java.util.Properties
+import kafka.producer.async.DefaultEventHandler
+import kafka.serializer.{DefaultEncoder, StringEncoder}
+import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
+import kafka.consumer._
+import kafka.utils.{StringSerializer, Utils}
+import kafka.api.OffsetRequest
+import org.I0Itec.zkclient._
+import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
+
+object ReplayLogProducer {
+
+ private val GROUPID: String = "replay-log-producer"
+ private val logger = Logger.getLogger(getClass)
+
+ def main(args: Array[String]) {
+ var isNoPrint = false;
+
+ val config = new Config(args)
+
+ val executor = Executors.newFixedThreadPool(config.numThreads)
+ val allDone = new CountDownLatch(config.numThreads)
+
+ // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
+ tryCleanupZookeeper(config.zkConnect, GROUPID)
+ Thread.sleep(500)
+
+ // consumer properties
+ val consumerProps = new Properties
+ consumerProps.put("groupid", GROUPID)
+ consumerProps.put("zk.connect", config.zkConnect)
+ consumerProps.put("consumer.timeout.ms", "10000")
+ consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString)
+ consumerProps.put("fetch.size", (1024*1024).toString)
+ consumerProps.put("socket.buffer.size", (2 * 1024 * 1024).toString)
+ val consumerConfig = new ConsumerConfig(consumerProps)
+ val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+ val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
+ var threadList = List[ZKConsumerThread]()
+ for ((topic, streamList) <- topicMessageStreams)
+ for (stream <- streamList)
+ threadList ::= new ZKConsumerThread(config, stream)
+
+ for (thread <- threadList)
+ thread.start
+
+ threadList.foreach(_.shutdown)
+ consumerConnector.shutdown
+ }
+
+ class Config(args: Array[String]) {
+ val parser = new OptionParser
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("zookeeper url")
+ .ofType(classOf[String])
+ .defaultsTo("127.0.0.1:2181")
+ val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
+ .withRequiredArg
+ .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
+ .ofType(classOf[String])
+ val inputTopicOpt = parser.accepts("inputtopic", "REQUIRED: The topic to consume from.")
+ .withRequiredArg
+ .describedAs("input-topic")
+ .ofType(classOf[String])
+ val outputTopicOpt = parser.accepts("outputtopic", "REQUIRED: The topic to produce to")
+ .withRequiredArg
+ .describedAs("output-topic")
+ .ofType(classOf[String])
+ val numMessagesOpt = parser.accepts("messages", "The number of messages to send.")
+ .withRequiredArg
+ .describedAs("count")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(-1)
+ val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
+ val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2 batch sends.")
+ .withRequiredArg
+ .describedAs("ms")
+ .ofType(classOf[java.lang.Long])
+ .defaultsTo(0)
+ val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
+ .withRequiredArg
+ .describedAs("batch size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(200)
+ val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
+ .withRequiredArg
+ .describedAs("threads")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+ val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.")
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(5000)
+ val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
+ .withRequiredArg
+ .describedAs("compression codec ")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(0)
+
+ val options = parser.parse(args : _*)
+ for(arg <- List(brokerInfoOpt, inputTopicOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+ val zkConnect = options.valueOf(zkConnectOpt)
+ val brokerInfo = options.valueOf(brokerInfoOpt)
+ val numMessages = options.valueOf(numMessagesOpt).intValue
+ val isAsync = options.has(asyncOpt)
+ val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
+ var batchSize = options.valueOf(batchSizeOpt).intValue
+ val numThreads = options.valueOf(numThreadsOpt).intValue
+ val inputTopic = options.valueOf(inputTopicOpt)
+ val outputTopic = options.valueOf(outputTopicOpt)
+ val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+ val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+ }
+
+ def tryCleanupZookeeper(zkUrl: String, groupId: String) {
+ try {
+ val dir = "/consumers/" + groupId
+ logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
+ zk.deleteRecursive(dir)
+ zk.close()
+ } catch {
+ case _ => // swallow
+ }
+ }
+
+ class ZKConsumerThread(config: Config, stream: KafkaMessageStream) extends Thread {
+ val shutdownLatch = new CountDownLatch(1)
+ val logger = Logger.getLogger(getClass)
+ val props = new Properties()
+ val brokerInfoList = config.brokerInfo.split("=")
+ if (brokerInfoList(0) == "zk.connect")
+ props.put("zk.connect", brokerInfoList(1))
+ else
+ props.put("broker.list", brokerInfoList(1))
+ props.put("reconnect.interval", Integer.MAX_VALUE.toString)
+ props.put("buffer.size", (64*1024).toString)
+ props.put("compression.codec", config.compressionCodec.codec.toString)
+ props.put("batch.size", config.batchSize.toString)
+
+ if(config.isAsync)
+ props.put("producer.type", "async")
+
+ val producerConfig = new ProducerConfig(props)
+ val producer = new Producer[Message, Message](producerConfig, new DefaultEncoder,
+ new DefaultEventHandler[Message](producerConfig, null),
+ null, new DefaultPartitioner[Message])
+
+ override def run() {
+ logger.info("Starting consumer thread..")
+ var messageCount: Int = 0
+ try {
+ val iter =
+ if(config.numMessages >= 0)
+ stream.slice(0, config.numMessages)
+ else
+ stream
+ for (message <- iter) {
+ try {
+ producer.send(new ProducerData[Message, Message](config.outputTopic, message))
+ if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
+ Thread.sleep(config.delayedMSBtwSend)
+ messageCount += 1
+ }catch {
+ case ie: Exception => logger.error("Skipping this message", ie)
+ }
+ }
+ }catch {
+ case e: ConsumerTimeoutException => logger.error("consumer thread timing out", e)
+ }
+ logger.info("Sent " + messageCount + " messages")
+ shutdownLatch.countDown
+ logger.info("thread finished execution !" )
+ }
+
+ def shutdown() {
+ shutdownLatch.await
+ producer.close
+ }
+
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.net.URI
+import joptsimple._
+import kafka.api.FetchRequest
+import kafka.utils._
+import kafka.server._
+import kafka.consumer.SimpleConsumer
+
+/**
+ * Performance test for the simple consumer
+ */
+object SimpleConsumerPerformance {
+
+ def main(args: Array[String]) {
+
+ val parser = new OptionParser
+ val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
+ .withRequiredArg
+ .describedAs("kafka://hostname:port")
+ .ofType(classOf[String])
+ val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.")
+ .withRequiredArg
+ .describedAs("bytes")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1024*1024)
+
+ val options = parser.parse(args : _*)
+
+ for(arg <- List(urlOpt, topicOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val url = new URI(options.valueOf(urlOpt))
+ val topic = options.valueOf(topicOpt)
+ val fetchSize = options.valueOf(fetchSizeOpt).intValue
+
+ val consumer = new SimpleConsumer(url.getHost, url.getPort, 30*1000, 2*fetchSize)
+ val startMs = System.currentTimeMillis
+ var done = false
+ var totalRead = 0
+ val reportingInterval = 100000
+ var consumedInInterval = 0
+ var offset: Long = 0L
+ while(!done) {
+ val messages = consumer.fetch(new FetchRequest(topic, 0, offset, fetchSize))
+ var messagesRead = 0
+ for(message <- messages)
+ messagesRead += 1
+
+ if(messagesRead == 0)
+ done = true
+ else
+ offset += messages.validBytes
+
+ totalRead += messagesRead
+ consumedInInterval += messagesRead
+
+ if(consumedInInterval > reportingInterval) {
+ println("Bytes read: " + totalRead)
+ consumedInInterval = 0
+ }
+ }
+ val ellapsedSeconds = (System.currentTimeMillis - startMs) / 1000.0
+ println(totalRead + " messages read, " + offset + " bytes")
+ println("Messages/sec: " + totalRead / ellapsedSeconds)
+ println("MB/sec: " + offset / ellapsedSeconds / (1024.0*1024.0))
+ System.exit(0)
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.net.URI
+import joptsimple._
+import kafka.api.FetchRequest
+import kafka.utils._
+import kafka.consumer._
+import kafka.server._
+
+/**
+ * Command line program to dump out messages to standard out using the simple consumer
+ */
+object SimpleConsumerShell {
+
+ def main(args: Array[String]): Unit = {
+
+ val parser = new OptionParser
+ val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
+ .withRequiredArg
+ .describedAs("kafka://hostname:port")
+ .ofType(classOf[String])
+ val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val partitionOpt = parser.accepts("partition", "The partition to consume from.")
+ .withRequiredArg
+ .describedAs("partition")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(0)
+ val offsetOpt = parser.accepts("offset", "The offset to start consuming from.")
+ .withRequiredArg
+ .describedAs("offset")
+ .ofType(classOf[java.lang.Long])
+ .defaultsTo(0L)
+ val fetchsizeOpt = parser.accepts("fetchsize", "The fetch size of each request.")
+ .withRequiredArg
+ .describedAs("fetchsize")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1000000)
+
+ val options = parser.parse(args : _*)
+
+ for(arg <- List(urlOpt, topicOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val url = new URI(options.valueOf(urlOpt))
+ val topic = options.valueOf(topicOpt)
+ val partition = options.valueOf(partitionOpt).intValue
+ val startingOffset = options.valueOf(offsetOpt).longValue
+ val fetchsize = options.valueOf(fetchsizeOpt).intValue
+
+ println("Starting consumer...")
+ val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
+ val thread = Utils.newThread("kafka-consumer", new Runnable() {
+ def run() {
+ var offset = startingOffset
+ while(true) {
+ val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
+ val messageSets = consumer.multifetch(fetchRequest)
+ for (messages <- messageSets) {
+ println("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
+ var consumed = 0
+ for(messageAndOffset <- messages) {
+ println("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+ consumed += 1
+ }
+ if(consumed > 0)
+ offset += messages.validBytes
+ }
+ Thread.sleep(10000)
+ }
+ }
+ }, false);
+ thread.start()
+ thread.join()
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Annotations.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Annotations.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Annotations.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Annotations.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+/* Some helpful annotations */
+
+/**
+ * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation
+ * must respect
+ */
+class threadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is not threadsafe
+ */
+class nonthreadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is immutable
+ */
+class immutable extends StaticAnnotation
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/DelayedItem.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/DelayedItem.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/DelayedItem.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/DelayedItem.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent._
+import scala.math._
+
+class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed {
+
+ val delayMs = unit.toMillis(delay)
+ val createdMs = System.currentTimeMillis
+
+ def this(item: T, delayMs: Long) =
+ this(item, delayMs, TimeUnit.MILLISECONDS)
+
+ /**
+ * The remaining delay time
+ */
+ def getDelay(unit: TimeUnit): Long = {
+ val ellapsedMs = (System.currentTimeMillis - createdMs)
+ unit.convert(max(delayMs - ellapsedMs, 0), unit)
+ }
+
+ def compareTo(d: Delayed): Int = {
+ val delayed = d.asInstanceOf[DelayedItem[T]]
+ val myEnd = createdMs + delayMs
+ val yourEnd = delayed.createdMs - delayed.delayMs
+
+ if(myEnd < yourEnd) -1
+ else if(myEnd > yourEnd) 1
+ else 0
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.io._
+import kafka.message._
+import kafka.utils._
+
+object DumpLogSegments {
+
+ def main(args: Array[String]) {
+ var isNoPrint = false;
+ for(arg <- args)
+ if ("-noprint".compareToIgnoreCase(arg) == 0)
+ isNoPrint = true;
+
+ for(arg <- args) {
+ if (! ("-noprint".compareToIgnoreCase(arg) == 0) ) {
+ val file = new File(arg)
+ println("Dumping " + file)
+ var offset = file.getName().split("\\.")(0).toLong
+ println("Starting offset: " + offset)
+ val messageSet = new FileMessageSet(file, false)
+ for(messageAndOffset <- messageSet) {
+ println("----------------------------------------------")
+ if (messageAndOffset.message.isValid)
+ println("offset:\t" + offset)
+ else
+ println("offset:\t %d \t invalid".format(offset))
+ if (!isNoPrint)
+ println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+ offset += messageAndOffset.offset
+ }
+ }
+ }
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+class State
+object DONE extends State
+object READY extends State
+object NOT_READY extends State
+object FAILED extends State
+
+/**
+ * Transliteration of the iterator template in google collections. To implement an iterator
+ * override makeNext and call allDone() when there is no more items
+ */
+abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] {
+
+ private var state: State = NOT_READY
+ private var nextItem: Option[T] = None
+
+ def next(): T = {
+ if(!hasNext())
+ throw new NoSuchElementException()
+ state = NOT_READY
+ nextItem match {
+ case Some(item) => item
+ case None => throw new IllegalStateException("Expected item but none found.")
+ }
+ }
+
+ def hasNext(): Boolean = {
+ if(state == FAILED)
+ throw new IllegalStateException("Iterator is in failed state")
+ state match {
+ case DONE => false
+ case READY => true
+ case _ => maybeComputeNext()
+ }
+ }
+
+ protected def makeNext(): T
+
+ def maybeComputeNext(): Boolean = {
+ state = FAILED
+ nextItem = Some(makeNext())
+ if(state == DONE) {
+ false
+ } else {
+ state = READY
+ true
+ }
+ }
+
+ protected def allDone(): T = {
+ state = DONE
+ null.asInstanceOf[T]
+ }
+
+ def remove =
+ throw new UnsupportedOperationException("Removal not supported")
+
+}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import kafka.utils._
+import org.apache.log4j.Logger
+
+/**
+ * A scheduler for running jobs in the background
+ * TODO: ScheduledThreadPoolExecutor notriously swallows exceptions
+ */
+class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) {
+ private val logger = Logger.getLogger(getClass())
+ private val threadId = new AtomicLong(0)
+ private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
+ def newThread(runnable: Runnable): Thread = {
+ val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement)
+ t.setDaemon(isDaemon)
+ t
+ }
+ })
+
+ def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) =
+ executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
+
+ def shutdown() = {
+ executor.shutdownNow
+ logger.info("shutdown scheduler " + baseThreadName)
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent._
+
+class MockTime(@volatile var currentMs: Long) extends Time {
+
+ def this() = this(System.currentTimeMillis)
+
+ def milliseconds: Long = currentMs
+
+ def nanoseconds: Long =
+ TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
+
+ def sleep(ms: Long): Unit =
+ currentMs += ms
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Pool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Pool.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Pool.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Pool.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.ArrayList
+import java.util.concurrent._
+import collection.JavaConversions
+
+class Pool[K,V] extends Iterable[(K, V)] {
+
+ private val pool = new ConcurrentHashMap[K, V]
+
+ def this(m: collection.Map[K, V]) {
+ this()
+ for((k,v) <- m.elements)
+ pool.put(k, v)
+ }
+
+ def put(k: K, v: V) = pool.put(k, v)
+
+ def putIfNotExists(k: K, v: V) = pool.putIfAbsent(k, v)
+
+ def contains(id: K) = pool.containsKey(id)
+
+ def get(key: K): V = pool.get(key)
+
+ def remove(key: K): V = pool.remove(key)
+
+ def keys = JavaConversions.asSet(pool.keySet())
+
+ def values: Iterable[V] =
+ JavaConversions.asIterable(new ArrayList[V](pool.values()))
+
+ def clear: Unit = pool.clear()
+
+ override def size = pool.size
+
+ override def iterator = new Iterator[(K,V)]() {
+
+ private val iter = pool.entrySet.iterator
+
+ def hasNext: Boolean = iter.hasNext
+
+ def next: (K, V) = {
+ val n = iter.next
+ (n.getKey, n.getValue)
+ }
+
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import scala.math._
+
+/**
+ * A generic range value with a start and end
+ */
+trait Range {
+ /** The first index in the range */
+ def start: Long
+ /** The total number of indexes in the range */
+ def size: Long
+ /** Return true iff the range is empty */
+ def isEmpty: Boolean = size == 0
+
+ /** if value is in range */
+ def contains(value: Long): Boolean = {
+ if( (size == 0 && value == start) ||
+ (size > 0 && value >= start && value <= start + size - 1) )
+ return true
+ else
+ return false
+ }
+
+ override def toString() = "(start=" + start + ", size=" + size + ")"
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils;
+
+import org.apache.log4j.Logger
+import scala.math._
+
+object Throttler {
+ val logger = Logger.getLogger(classOf[Throttler])
+ val DefaultCheckIntervalMs = 100L
+}
+
+/**
+ * A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second
+ * (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for
+ * an appropraite amount of time when maybeThrottle() is called to attain the desired rate.
+ *
+ * @param desiredRatePerSec: The rate we want to hit in units/sec
+ * @param checkIntervalMs: The interval at which to check our rate
+ * @param throttleDown: Does throttling increase or decrease our rate?
+ * @param time: The time implementation to use
+ */
+@nonthreadsafe
+class Throttler(val desiredRatePerSec: Double,
+ val checkIntervalMs: Long,
+ val throttleDown: Boolean,
+ val time: Time) {
+
+ private val lock = new Object
+ private var periodStartNs: Long = time.nanoseconds
+ private var observedSoFar: Double = 0.0
+
+ def this(desiredRatePerSec: Double, throttleDown: Boolean) =
+ this(desiredRatePerSec, Throttler.DefaultCheckIntervalMs, throttleDown, SystemTime)
+
+ def this(desiredRatePerSec: Double) =
+ this(desiredRatePerSec, Throttler.DefaultCheckIntervalMs, true, SystemTime)
+
+ def maybeThrottle(observed: Double) {
+ lock synchronized {
+ observedSoFar += observed
+ val now = time.nanoseconds
+ val ellapsedNs = now - periodStartNs
+ // if we have completed an interval AND we have observed something, maybe
+ // we should take a little nap
+ if(ellapsedNs > checkIntervalMs * Time.NsPerMs && observedSoFar > 0) {
+ val rateInSecs = (observedSoFar * Time.NsPerSec) / ellapsedNs
+ val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
+ if(needAdjustment) {
+ // solve for the amount of time to sleep to make us hit the desired rate
+ val desiredRateMs = desiredRatePerSec / Time.MsPerSec.asInstanceOf[Double]
+ val ellapsedMs = ellapsedNs / Time.NsPerMs
+ val sleepTime = round(observedSoFar / desiredRateMs - ellapsedMs)
+ if(sleepTime > 0) {
+ if(Throttler.logger.isDebugEnabled)
+ Throttler.logger.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec +
+ ", sleeping for " + sleepTime + " ms to compensate.")
+ time.sleep(sleepTime)
+ }
+ }
+ periodStartNs = now
+ observedSoFar = 0
+ }
+ }
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Time.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Time.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Time.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Time.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+/**
+ * Some common constants
+ */
+object Time {
+ val NsPerUs = 1000
+ val UsPerMs = 1000
+ val MsPerSec = 1000
+ val NsPerMs = NsPerUs * UsPerMs
+ val NsPerSec = NsPerMs * MsPerSec
+ val UsPerSec = UsPerMs * MsPerSec
+ val SecsPerMin = 60
+ val MinsPerHour = 60
+ val HoursPerDay = 24
+ val SecsPerHour = SecsPerMin * MinsPerHour
+ val SecsPerDay = SecsPerHour * HoursPerDay
+ val MinsPerDay = MinsPerHour * HoursPerDay
+}
+
+/**
+ * A mockable interface for time functions
+ */
+trait Time {
+
+ def milliseconds: Long
+
+ def nanoseconds: Long
+
+ def sleep(ms: Long)
+}
+
+/**
+ * The normal system implementation of time functions
+ */
+object SystemTime extends Time {
+
+ def milliseconds: Long = System.currentTimeMillis
+
+ def nanoseconds: Long = System.nanoTime
+
+ def sleep(ms: Long): Unit = Thread.sleep(ms)
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.consumer.{SimpleConsumer, ConsumerConfig}
+import kafka.cluster.Partition
+import kafka.api.OffsetRequest
+
+/**
+ * A utility that updates the offset of every broker partition to the offset of latest log segment file, in ZK.
+ */
+object UpdateOffsetsInZK {
+ val Earliest = "earliest"
+ val Latest = "latest"
+
+ def main(args: Array[String]) {
+ if(args.length < 3)
+ usage
+ val config = new ConsumerConfig(Utils.loadProps(args(1)))
+ val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs, StringSerializer)
+ args(0) match {
+ case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
+ case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))
+ case _ => usage
+ }
+ }
+
+ private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
+ val cluster = ZkUtils.getCluster(zkClient)
+ val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator)
+ var partitions: List[String] = Nil
+
+ partitionsPerTopicMap.get(topic) match {
+ case Some(l) => partitions = l.sortWith((s,t) => s < t)
+ case _ => throw new RuntimeException("Can't find topic " + topic)
+ }
+
+ var numParts = 0
+ for (partString <- partitions) {
+ val part = Partition.parse(partString)
+ val broker = cluster.getBroker(part.brokerId)
+ val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100 * 1024)
+ val offsets = consumer.getOffsetsBefore(topic, part.partId, offsetOption, 1)
+ val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+
+ println("updating partition " + part.name + " with new offset: " + offsets(0))
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + part.name, offsets(0).toString)
+ numParts += 1
+ }
+ println("updated the offset for " + numParts + " partitions")
+ }
+
+ private def usage() = {
+ println("USAGE: " + UpdateOffsetsInZK.getClass.getName + " [earliest | latest] consumer.properties topic")
+ System.exit(1)
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,684 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.io._
+import java.nio._
+import java.nio.channels._
+import java.util.concurrent.atomic._
+import java.lang.management._
+import java.util.zip.CRC32
+import org.apache.log4j.Logger
+import javax.management._
+import java.util.Properties
+import scala.collection._
+import scala.collection.mutable
+import kafka.message.{NoCompressionCodec, CompressionCodec}
+
+/**
+ * Helper functions!
+ */
+object Utils {
+ private val logger = Logger.getLogger(getClass())
+
+ /**
+ * Wrap the given function in a java.lang.Runnable
+ * @param fun A function
+ * @return A Runnable that just executes the function
+ */
+ def runnable(fun: () => Unit): Runnable =
+ new Runnable() {
+ def run() = fun()
+ }
+
+ /**
+ * Wrap the given function in a java.lang.Runnable that logs any errors encountered
+ * @param fun A function
+ * @return A Runnable that just executes the function
+ */
+ def loggedRunnable(fun: () => Unit): Runnable =
+ new Runnable() {
+ def run() = {
+ try {
+ fun()
+ }
+ catch {
+ case t =>
+ // log any error and the stack trace
+ logger.error(t, t)
+ logger.error(stackTrace(t), t)
+ }
+ }
+ }
+
+ /**
+ * Create a daemon thread
+ * @param name The name of the thread
+ * @param runnable The runnable to execute in the background
+ * @return The unstarted thread
+ */
+ def daemonThread(name: String, runnable: Runnable): Thread =
+ newThread(name, runnable, true)
+
+ /**
+ * Create a daemon thread
+ * @param name The name of the thread
+ * @param fun The runction to execute in the thread
+ * @return The unstarted thread
+ */
+ def daemonThread(name: String, fun: () => Unit): Thread =
+ daemonThread(name, runnable(fun))
+
+ /**
+ * Create a new thread
+ * @param name The name of the thread
+ * @param runnable The work for the thread to do
+ * @param daemon Should the thread block JVM shutdown?
+ * @return The unstarted thread
+ */
+ def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = {
+ val thread = new Thread(runnable, name)
+ thread.setDaemon(daemon)
+ thread
+ }
+
+ /**
+ * Read a byte array from the given offset and size in the buffer
+ * TODO: Should use System.arraycopy
+ */
+ def readBytes(buffer: ByteBuffer, offset: Int, size: Int): Array[Byte] = {
+ val bytes = new Array[Byte](size)
+ var i = 0
+ while(i < size) {
+ bytes(i) = buffer.get(offset + i)
+ i += 1
+ }
+ bytes
+ }
+
+ /**
+ * Read size prefixed string where the size is stored as a 2 byte short.
+ * @param buffer The buffer to read from
+ * @param encoding The encoding in which to read the string
+ */
+ def readShortString(buffer: ByteBuffer, encoding: String): String = {
+ val size: Int = buffer.getShort()
+ if(size < 0)
+ return null
+ val bytes = new Array[Byte](size)
+ buffer.get(bytes)
+ new String(bytes, encoding)
+ }
+
+ /**
+ * Write a size prefixed string where the size is stored as a 2 byte short
+ * @param buffer The buffer to write to
+ * @param string The string to write
+ * @param encoding The encoding in which to write the string
+ */
+ def writeShortString(buffer: ByteBuffer, string: String, encoding: String): Unit = {
+ if(string == null) {
+ buffer.putShort(-1)
+ } else if(string.length > Short.MaxValue) {
+ throw new IllegalArgumentException("String exceeds the maximum size of " + Short.MaxValue + ".")
+ } else {
+ buffer.putShort(string.length.asInstanceOf[Short])
+ buffer.put(string.getBytes(encoding))
+ }
+ }
+
+ /**
+ * Read a properties file from the given path
+ * @param filename The path of the file to read
+ */
+ def loadProps(filename: String): Properties = {
+ val propStream = new FileInputStream(filename)
+ val props = new Properties()
+ props.load(propStream)
+ props
+ }
+
+ /**
+ * Read a required integer property value or throw an exception if no such property is found
+ */
+ def getInt(props: Properties, name: String): Int = {
+ if(props.containsKey(name))
+ return getInt(props, name, -1)
+ else
+ throw new IllegalArgumentException("Missing required property '" + name + "'")
+ }
+
+ /**
+ * Read an integer from the properties instance
+ * @param props The properties to read from
+ * @param name The property name
+ * @param default The default value to use if the property is not found
+ * @return the integer value
+ */
+ def getInt(props: Properties, name: String, default: Int): Int =
+ getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue))
+
+ /**
+ * Read an integer from the properties instance. Throw an exception
+ * if the value is not in the given range (inclusive)
+ * @param props The properties to read from
+ * @param name The property name
+ * @param default The default value to use if the property is not found
+ * @param range The range in which the value must fall (inclusive)
+ * @throws IllegalArgumentException If the value is not in the given range
+ * @return the integer value
+ */
+ def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = {
+ val v =
+ if(props.containsKey(name))
+ props.getProperty(name).toInt
+ else
+ default
+ if(v < range._1 || v > range._2)
+ throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+ else
+ v
+ }
+
+ /**
+ * Read a boolean value from the properties instance
+ * @param props The properties to read from
+ * @param name The property name
+ * @param default The default value to use if the property is not found
+ * @return the boolean value
+ */
+ def getBoolean(props: Properties, name: String, default: Boolean): Boolean = {
+ if(!props.containsKey(name))
+ default
+ else if("true" == props.getProperty(name))
+ true
+ else if("false" == props.getProperty(name))
+ false
+ else
+ throw new IllegalArgumentException("Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false" )
+ }
+
+ /**
+ * Get a string property, or, if no such property is defined, return the given default value
+ */
+ def getString(props: Properties, name: String, default: String): String = {
+ if(props.containsKey(name))
+ props.getProperty(name)
+ else
+ default
+ }
+
+ /**
+ * Get a string property or throw and exception if no such property is defined.
+ */
+ def getString(props: Properties, name: String): String = {
+ if(props.containsKey(name))
+ props.getProperty(name)
+ else
+ throw new IllegalArgumentException("Missing required property '" + name + "'")
+ }
+
+ /**
+ * Get a property of type java.util.Properties or throw and exception if no such property is defined.
+ */
+ def getProps(props: Properties, name: String): Properties = {
+ if(props.containsKey(name)) {
+ val propString = props.getProperty(name)
+ val propValues = propString.split(",")
+ val properties = new Properties
+ for(i <- 0 until propValues.length) {
+ val prop = propValues(i).split("=")
+ if(prop.length != 2)
+ throw new IllegalArgumentException("Illegal format of specifying properties '" + propValues(i) + "'")
+ properties.put(prop(0), prop(1))
+ }
+ properties
+ }
+ else
+ throw new IllegalArgumentException("Missing required property '" + name + "'")
+ }
+
+ /**
+ * Get a property of type java.util.Properties or return the default if no such property is defined
+ */
+ def getProps(props: Properties, name: String, default: Properties): Properties = {
+ if(props.containsKey(name)) {
+ val propString = props.getProperty(name)
+ val propValues = propString.split(",")
+ if(propValues.length < 1)
+ throw new IllegalArgumentException("Illegal format of specifying properties '" + propString + "'")
+ val properties = new Properties
+ for(i <- 0 until propValues.length) {
+ val prop = propValues(i).split("=")
+ if(prop.length != 2)
+ throw new IllegalArgumentException("Illegal format of specifying properties '" + propValues(i) + "'")
+ properties.put(prop(0), prop(1))
+ }
+ properties
+ }
+ else
+ default
+ }
+
+ /**
+ * Open a channel for the given file
+ */
+ def openChannel(file: File, mutable: Boolean): FileChannel = {
+ if(mutable)
+ new RandomAccessFile(file, "rw").getChannel()
+ else
+ new FileInputStream(file).getChannel()
+ }
+
+ /**
+ * Do the given action and log any exceptions thrown without rethrowing them
+ * @param log The log method to use for logging. E.g. logger.warn
+ * @param action The action to execute
+ */
+ def swallow(log: (Object, Throwable) => Unit, action: => Unit) = {
+ try {
+ action
+ } catch {
+ case e: Throwable => log(e.getMessage(), e)
+ }
+ }
+
+ /**
+ * Test if two byte buffers are equal. In this case equality means having
+ * the same bytes from the current position to the limit
+ */
+ def equal(b1: ByteBuffer, b2: ByteBuffer): Boolean = {
+ // two byte buffers are equal if their position is the same,
+ // their remaining bytes are the same, and their contents are the same
+ if(b1.position != b2.position)
+ return false
+ if(b1.remaining != b2.remaining)
+ return false
+ for(i <- 0 until b1.remaining)
+ if(b1.get(i) != b2.get(i))
+ return false
+ return true
+ }
+
+ /**
+ * Translate the given buffer into a string
+ * @param buffer The buffer to translate
+ * @param encoding The encoding to use in translating bytes to characters
+ */
+ def toString(buffer: ByteBuffer, encoding: String): String = {
+ val bytes = new Array[Byte](buffer.remaining)
+ buffer.get(bytes)
+ new String(bytes, encoding)
+ }
+
+ /**
+ * Print an error message and shutdown the JVM
+ * @param message The error message
+ */
+ def croak(message: String) {
+ System.err.println(message)
+ System.exit(1)
+ }
+
+ /**
+ * Recursively delete the given file/directory and any subfiles (if any exist)
+ * @param file The root file at which to begin deleting
+ */
+ def rm(file: String): Unit = rm(new File(file))
+
+ /**
+ * Recursively delete the given file/directory and any subfiles (if any exist)
+ * @param file The root file at which to begin deleting
+ */
+ def rm(file: File): Unit = {
+ if(file == null) {
+ return
+ } else if(file.isDirectory) {
+ val files = file.listFiles()
+ if(files != null) {
+ for(f <- files)
+ rm(f)
+ }
+ file.delete()
+ } else {
+ file.delete()
+ }
+ }
+
+ /**
+ * Register the given mbean with the platform mbean server,
+ * unregistering any mbean that was there before
+ * @param mbean The object to register as an mbean
+ * @param name The name to register this mbean with
+ */
+ def registerMBean(mbean: Object, name: String) {
+ val mbs = ManagementFactory.getPlatformMBeanServer()
+ mbs synchronized {
+ val objName = new ObjectName(name)
+ if(mbs.isRegistered(objName))
+ mbs.unregisterMBean(objName)
+ mbs.registerMBean(mbean, objName)
+ }
+ }
+
+ /**
+ * Unregister the mbean with the given name, if there is one registered
+ * @param name The mbean name to unregister
+ */
+ def unregisterMBean(name: String) {
+ val mbs = ManagementFactory.getPlatformMBeanServer()
+ mbs synchronized {
+ val objName = new ObjectName(name)
+ if(mbs.isRegistered(objName))
+ mbs.unregisterMBean(objName)
+ }
+ }
+
+ /**
+ * Read an unsigned integer from the current position in the buffer,
+ * incrementing the position by 4 bytes
+ * @param The buffer to read from
+ * @return The integer read, as a long to avoid signedness
+ */
+ def getUnsignedInt(buffer: ByteBuffer): Long =
+ buffer.getInt() & 0xffffffffL
+
+ /**
+ * Read an unsigned integer from the given position without modifying the buffers
+ * position
+ * @param The buffer to read from
+ * @param index the index from which to read the integer
+ * @return The integer read, as a long to avoid signedness
+ */
+ def getUnsignedInt(buffer: ByteBuffer, index: Int): Long =
+ buffer.getInt(index) & 0xffffffffL
+
+ /**
+ * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+ * @param buffer The buffer to write to
+ * @param value The value to write
+ */
+ def putUnsignedInt(buffer: ByteBuffer, value: Long): Unit =
+ buffer.putInt((value & 0xffffffffL).asInstanceOf[Int])
+
+ /**
+ * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+ * @param buffer The buffer to write to
+ * @param index The position in the buffer at which to begin writing
+ * @param value The value to write
+ */
+ def putUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit =
+ buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int])
+
+ /**
+ * Compute the CRC32 of the byte array
+ * @param bytes The array to compute the checksum for
+ * @return The CRC32
+ */
+ def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length)
+
+ /**
+ * Compute the CRC32 of the segment of the byte array given by the specificed size and offset
+ * @param bytes The bytes to checksum
+ * @param the offset at which to begin checksumming
+ * @param the number of bytes to checksum
+ * @return The CRC32
+ */
+ def crc32(bytes: Array[Byte], offset: Int, size: Int): Long = {
+ val crc = new CRC32()
+ crc.update(bytes, offset, size)
+ crc.getValue()
+ }
+
+ /**
+ * Compute the hash code for the given items
+ */
+ def hashcode(as: Any*): Int = {
+ if(as == null)
+ return 0
+ var h = 1
+ var i = 0
+ while(i < as.length) {
+ if(as(i) != null) {
+ h = 31 * h + as(i).hashCode
+ i += 1
+ }
+ }
+ return h
+ }
+
+ /**
+ * Group the given values by keys extracted with the given function
+ */
+ def groupby[K,V](vals: Iterable[V], f: V => K): Map[K,List[V]] = {
+ val m = new mutable.HashMap[K, List[V]]
+ for(v <- vals) {
+ val k = f(v)
+ m.get(k) match {
+ case Some(l: List[V]) => m.put(k, v :: l)
+ case None => m.put(k, List(v))
+ }
+ }
+ m
+ }
+
+ /**
+ * Read some bytes into the provided buffer, and return the number of bytes read. If the
+ * channel has been closed or we get -1 on the read for any reason, throw an EOFException
+ */
+ def read(channel: ReadableByteChannel, buffer: ByteBuffer): Int = {
+ channel.read(buffer) match {
+ case -1 => throw new EOFException("Received -1 when reading from channel, socket has likely been closed.")
+ case n: Int => n
+ }
+ }
+
+ def notNull[V](v: V) = {
+ if(v == null)
+ throw new IllegalArgumentException("Value cannot be null.")
+ else
+ v
+ }
+
+ def getHostPort(hostport: String) : Tuple2[String, Int] = {
+ val splits = hostport.split(":")
+ (splits(0), splits(1).toInt)
+ }
+
+ def getTopicPartition(topicPartition: String) : Tuple2[String, Int] = {
+ val index = topicPartition.lastIndexOf('-')
+ (topicPartition.substring(0,index), topicPartition.substring(index+1).toInt)
+ }
+
+ def stackTrace(e: Throwable): String = {
+ val sw = new StringWriter;
+ val pw = new PrintWriter(sw);
+ e.printStackTrace(pw);
+ sw.toString();
+ }
+
+ /**
+ * This method gets comma seperated values which contains key,value pairs and returns a map of
+ * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
+ */
+ private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = {
+ val map = new mutable.HashMap[K, V]
+ if("".equals(allCSVals))
+ return map
+ val csVals = allCSVals.split(",")
+ for(i <- 0 until csVals.length)
+ {
+ try{
+ val tempSplit = csVals(i).split(":")
+ logger.info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim))
+ map += tempSplit(0).asInstanceOf[K] -> Integer.parseInt(tempSplit(1).trim).asInstanceOf[V]
+ } catch {
+ case _ => logger.error(exceptionMsg + ": " + csVals(i))
+ }
+ }
+ map
+ }
+
+ def getCSVList(csvList: String): Seq[String] = {
+ if(csvList == null)
+ Seq.empty[String]
+ else {
+ csvList.split(",").filter(v => !v.equals(""))
+ }
+ }
+
+ def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
+ val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
+ val successMsg = "The retention hour for "
+ getCSVMap(retentionHours, exceptionMsg, successMsg)
+ }
+
+ def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = {
+ val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: "
+ val successMsg = "The flush interval for "
+ getCSVMap(allIntervals, exceptionMsg, successMsg)
+ }
+
+ def getTopicPartitions(allPartitions: String) : Map[String, Int] = {
+ val exceptionMsg = "Malformed token for topic.partition.counts in server.properties: "
+ val successMsg = "The number of partitions for topic "
+ getCSVMap(allPartitions, exceptionMsg, successMsg)
+ }
+
+ def getConsumerTopicMap(consumerTopicString: String) : Map[String, Int] = {
+ val exceptionMsg = "Malformed token for embeddedconsumer.topics in consumer.properties: "
+ val successMsg = "The number of consumer thread for topic "
+ getCSVMap(consumerTopicString, exceptionMsg, successMsg)
+ }
+
+ def getObject[T<:AnyRef](className: String): T = {
+ className match {
+ case null => null.asInstanceOf[T]
+ case _ =>
+ val clazz = Class.forName(className)
+ val clazzT = clazz.asInstanceOf[Class[T]]
+ val constructors = clazzT.getConstructors
+ require(constructors.length == 1)
+ constructors.head.newInstance().asInstanceOf[T]
+ }
+ }
+
+ def propertyExists(prop: String): Boolean = {
+ if(prop == null)
+ false
+ else if(prop.compareTo("") == 0)
+ false
+ else true
+ }
+
+ def getCompressionCodec(props: Properties, codec: String): CompressionCodec = {
+ val codecValueString = props.getProperty(codec)
+ if(codecValueString == null)
+ NoCompressionCodec
+ else
+ CompressionCodec.getCompressionCodec(codecValueString.toInt)
+ }
+}
+
+class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
+ private val time: Time = SystemTime
+
+ private val complete = new AtomicReference(new Stats())
+ private val current = new AtomicReference(new Stats())
+ private val numCumulatedRequests = new AtomicLong(0)
+
+ def recordRequestMetric(requestNs: Long) {
+ val stats = current.get
+ stats.add(requestNs)
+ numCumulatedRequests.getAndAdd(1)
+ val ageNs = time.nanoseconds - stats.start
+ // if the current stats are too old it is time to swap
+ if(ageNs >= monitorDurationNs) {
+ val swapped = current.compareAndSet(stats, new Stats())
+ if(swapped) {
+ complete.set(stats)
+ stats.end.set(time.nanoseconds)
+ }
+ }
+ }
+
+ def recordThroughputMetric(data: Long) {
+ val stats = current.get
+ stats.addData(data)
+ val ageNs = time.nanoseconds - stats.start
+ // if the current stats are too old it is time to swap
+ if(ageNs >= monitorDurationNs) {
+ val swapped = current.compareAndSet(stats, new Stats())
+ if(swapped) {
+ complete.set(stats)
+ stats.end.set(time.nanoseconds)
+ }
+ }
+ }
+
+ def getNumRequests(): Long = numCumulatedRequests.get
+
+ def getRequestsPerSecond: Double = {
+ val stats = complete.get
+ stats.numRequests / stats.durationSeconds
+ }
+
+ def getThroughput: Double = {
+ val stats = complete.get
+ stats.totalData / stats.durationSeconds
+ }
+
+ def getAvgMetric: Double = {
+ val stats = complete.get
+ if (stats.numRequests == 0) {
+ 0
+ }
+ else {
+ stats.totalRequestMetric / stats.numRequests
+ }
+ }
+
+ def getMaxMetric: Double = complete.get.maxRequestMetric
+
+ class Stats {
+ val start = time.nanoseconds
+ var end = new AtomicLong(-1)
+ var numRequests = 0
+ var totalRequestMetric: Long = 0L
+ var maxRequestMetric: Long = 0L
+ var totalData: Long = 0L
+ private val lock = new Object()
+
+ def addData(data: Long) {
+ lock synchronized {
+ totalData += data
+ }
+ }
+
+ def add(requestNs: Long) {
+ lock synchronized {
+ numRequests +=1
+ totalRequestMetric += requestNs
+ maxRequestMetric = scala.math.max(maxRequestMetric, requestNs)
+ }
+ }
+
+ def durationSeconds: Double = (end.get - start) / (1000.0 * 1000.0 * 1000.0)
+
+ def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.serialize.ZkSerializer
+import kafka.cluster.{Broker, Cluster}
+import scala.collection._
+import java.util.Properties
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
+import org.apache.log4j.Logger
+
+object ZkUtils {
+ val ConsumersPath = "/consumers"
+ val BrokerIdsPath = "/brokers/ids"
+ val BrokerTopicsPath = "/brokers/topics"
+ private val logger = Logger.getLogger(getClass())
+
+ /**
+ * make sure a persistent path exists in ZK. Create the path if not exist.
+ */
+ def makeSurePersistentPathExists(client: ZkClient, path: String) {
+ if (!client.exists(path))
+ client.createPersistent(path, true) // won't throw NoNodeException or NodeExistsException
+ }
+
+ /**
+ * create the parent path
+ */
+ private def createParentPath(client: ZkClient, path: String): Unit = {
+ val parentDir = path.substring(0, path.lastIndexOf('/'))
+ if (parentDir.length != 0)
+ client.createPersistent(parentDir, true)
+ }
+
+ /**
+ * Create an ephemeral node with the given path and data. Create parents if necessary.
+ */
+ private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
+ try {
+ client.createEphemeral(path, data)
+ }
+ catch {
+ case e: ZkNoNodeException => {
+ createParentPath(client, path)
+ client.createEphemeral(path, data)
+ }
+ }
+ }
+
+ /**
+ * Create an ephemeral node with the given path and data.
+ * Throw NodeExistException if node already exists.
+ */
+ def createEphemeralPathExpectConflict(client: ZkClient, path: String, data: String): Unit = {
+ try {
+ createEphemeralPath(client, path, data)
+ }
+ catch {
+ case e: ZkNodeExistsException => {
+ // this can happen when there is connection loss; make sure the data is what we intend to write
+ var storedData: String = null
+ try {
+ storedData = readData(client, path)
+ }
+ catch {
+ case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
+ case e2 => throw e2
+ }
+ if (storedData == null || storedData != data) {
+ logger.info("conflict in " + path + " data: " + data + " stored data: " + storedData)
+ throw e
+ }
+ else {
+ // otherwise, the creation succeeded, return normally
+ logger.info(path + " exists with value " + data + " during connection loss; this is ok")
+ }
+ }
+ case e2 => throw e2
+ }
+ }
+
+ /**
+ * Update the value of a persistent node with the given path and data.
+ * create parrent directory if necessary. Never throw NodeExistException.
+ */
+ def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
+ try {
+ client.writeData(path, data)
+ }
+ catch {
+ case e: ZkNoNodeException => {
+ createParentPath(client, path)
+ try {
+ client.createPersistent(path, data)
+ }
+ catch {
+ case e: ZkNodeExistsException => client.writeData(path, data)
+ case e2 => throw e2
+ }
+ }
+ case e2 => throw e2
+ }
+ }
+
+ /**
+ * Update the value of a persistent node with the given path and data.
+ * create parrent directory if necessary. Never throw NodeExistException.
+ */
+ def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
+ try {
+ client.writeData(path, data)
+ }
+ catch {
+ case e: ZkNoNodeException => {
+ createParentPath(client, path)
+ client.createEphemeral(path, data)
+ }
+ case e2 => throw e2
+ }
+ }
+
+ def deletePath(client: ZkClient, path: String) {
+ try {
+ client.delete(path)
+ }
+ catch {
+ case e: ZkNoNodeException =>
+ // this can happen during a connection loss event, return normally
+ logger.info(path + " deleted during connection loss; this is ok")
+ case e2 => throw e2
+ }
+ }
+
+ def deletePathRecursive(client: ZkClient, path: String) {
+ try {
+ client.deleteRecursive(path)
+ }
+ catch {
+ case e: ZkNoNodeException =>
+ // this can happen during a connection loss event, return normally
+ logger.info(path + " deleted during connection loss; this is ok")
+ case e2 => throw e2
+ }
+ }
+
+ def readData(client: ZkClient, path: String): String = {
+ client.readData(path)
+ }
+
+ def readDataMaybeNull(client: ZkClient, path: String): String = {
+ client.readData(path, true)
+ }
+
+ def getChildren(client: ZkClient, path: String): Seq[String] = {
+ import scala.collection.JavaConversions._
+ // triggers implicit conversion from java list to scala Seq
+ client.getChildren(path)
+ }
+
+ def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = {
+ import scala.collection.JavaConversions._
+ // triggers implicit conversion from java list to scala Seq
+
+ var ret: java.util.List[String] = null
+ try {
+ ret = client.getChildren(path)
+ }
+ catch {
+ case e: ZkNoNodeException =>
+ return Nil
+ case e2 => throw e2
+ }
+ return ret
+ }
+
+ /**
+ * Check if the given path exists
+ */
+ def pathExists(client: ZkClient, path: String): Boolean = {
+ client.exists(path)
+ }
+
+ def getLastPart(path : String) : String = path.substring(path.lastIndexOf('/') + 1)
+
+ def getCluster(zkClient: ZkClient) : Cluster = {
+ val cluster = new Cluster
+ val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
+ for (node <- nodes) {
+ val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)
+ cluster.add(Broker.createBroker(node.toInt, brokerZKString))
+ }
+ cluster
+ }
+
+ def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, List[String]] = {
+ val ret = new mutable.HashMap[String, List[String]]()
+ for (topic <- topics) {
+ var partList: List[String] = Nil
+ val brokers = getChildrenParentMayNotExist(zkClient, BrokerTopicsPath + "/" + topic)
+ for (broker <- brokers) {
+ val nParts = readData(zkClient, BrokerTopicsPath + "/" + topic + "/" + broker).toInt
+ for (part <- 0 until nParts)
+ partList ::= broker + "-" + part
+ }
+ partList = partList.sortWith((s,t) => s < t)
+ ret += (topic -> partList)
+ }
+ ret
+ }
+
+ def setupPartition(zkClient : ZkClient, brokerId: Int, host: String, port: Int, topic: String, nParts: Int) {
+ val brokerIdPath = BrokerIdsPath + "/" + brokerId
+ val broker = new Broker(brokerId, brokerId.toString, host, port)
+ createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+ val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
+ createEphemeralPathExpectConflict(zkClient, brokerPartTopicPath, nParts.toString)
+ }
+
+ def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
+ val brokerIdPath = BrokerIdsPath + "/" + brokerId
+ zkClient.delete(brokerIdPath)
+ val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
+ zkClient.delete(brokerPartTopicPath)
+ }
+}
+
+object StringSerializer extends ZkSerializer {
+
+ @throws(classOf[ZkMarshallingError])
+ def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
+
+ @throws(classOf[ZkMarshallingError])
+ def deserialize(bytes : Array[Byte]) : Object = {
+ if (bytes == null)
+ null
+ else
+ new String(bytes, "UTF-8")
+ }
+}
+
+class ZKGroupDirs(val group: String) {
+ def consumerDir = ZkUtils.ConsumersPath
+ def consumerGroupDir = consumerDir + "/" + group
+ def consumerRegistryDir = consumerGroupDir + "/ids"
+}
+
+class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
+ def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic
+ def consumerOwnerDir = consumerGroupDir + "/owners/" + topic
+}
+
+
+class ZKConfig(props: Properties) {
+ /** ZK host string */
+ val zkConnect = Utils.getString(props, "zk.connect", null)
+
+ /** zookeeper session timeout */
+ val zkSessionTimeoutMs = Utils.getInt(props, "zk.sessiontimeout.ms", 6000)
+
+ /** the max time that the client waits to establish a connection to zookeeper */
+ val zkConnectionTimeoutMs = Utils.getInt(props, "zk.connectiontimeout.ms",zkSessionTimeoutMs)
+
+ /** how far a ZK follower can be behind a ZK leader */
+ val zkSyncTimeMs = Utils.getInt(props, "zk.synctime.ms", 2000)
+}