You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC

svn commit: r1152970 [20/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import kafka.utils.Utils
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.AtomicLong
+import kafka.producer._
+import async.DefaultEventHandler
+import kafka.serializer.StringEncoder
+import org.apache.log4j.Logger
+import joptsimple.{OptionSet, OptionParser}
+import java.util.{Random, Properties}
+import kafka.message.{CompressionCodec, Message, ByteBufferMessageSet}
+
+/**
+ * Load test for the producer
+ */
+object ProducerPerformance {
+
+  def main(args: Array[String]) {
+
+    val logger = Logger.getLogger(getClass)
+    val config = new PerfConfig(args)
+    if(!config.isFixSize)
+      logger.info("WARN: Throughput will be slower due to changing message size per request")
+
+    val totalBytesSent = new AtomicLong(0)
+    val totalMessagesSent = new AtomicLong(0)
+    val executor = Executors.newFixedThreadPool(config.numThreads)
+    val allDone = new CountDownLatch(config.numThreads)
+    val startMs = System.currentTimeMillis
+    val rand = new java.util.Random
+
+    for(i <- 0 until config.numThreads) {
+      if(config.isAsync)
+        executor.execute(new AsyncProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
+      else
+        executor.execute(new SyncProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
+    }
+
+    allDone.await()
+    val elapsedSecs = (System.currentTimeMillis - startMs) / 1000.0
+    logger.info("Total Num Messages: " + totalMessagesSent.get + " bytes: " + totalBytesSent.get + " in " + elapsedSecs + " secs")
+    logger.info("Messages/sec: " + (1.0 * totalMessagesSent.get / elapsedSecs).formatted("%.4f"))
+    logger.info("MB/sec: " + (totalBytesSent.get / elapsedSecs / (1024.0*1024.0)).formatted("%.4f"))
+    System.exit(0)
+  }
+
+  class PerfConfig(args: Array[String]) {
+    val parser = new OptionParser
+    val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
+      .withRequiredArg
+      .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
+      .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
+    val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+    val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(100)
+    val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
+    val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
+    val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2 batch sends.")
+      .withRequiredArg
+      .describedAs("ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(0)
+    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(200)
+    val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(10)
+    val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(5000)
+    val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
+      .withRequiredArg
+      .describedAs("compression codec ")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+
+    val options = parser.parse(args : _*)
+    for(arg <- List(brokerInfoOpt, topicOpt, numMessagesOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    val brokerInfo = options.valueOf(brokerInfoOpt)
+    val numMessages = options.valueOf(numMessagesOpt).intValue
+    val messageSize = options.valueOf(messageSizeOpt).intValue
+    val isFixSize = !options.has(varyMessageSizeOpt)
+    val isAsync = options.has(asyncOpt)
+    val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
+    var batchSize = options.valueOf(batchSizeOpt).intValue
+    val numThreads = options.valueOf(numThreadsOpt).intValue
+    val topic = options.valueOf(topicOpt)
+    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+  }
+
+  private def getStringOfLength(len: Int) : String = {
+    val strArray = new Array[Char](len)
+    for (i <- 0 until len)
+      strArray(i) = 'x'
+    return new String(strArray)
+  }
+
+  class AsyncProducerThread(val threadId: Int,
+                            val config: PerfConfig,
+                            val totalBytesSent: AtomicLong,
+                            val totalMessagesSent: AtomicLong,
+                            val allDone: CountDownLatch,
+                            val rand: Random) extends Runnable {
+    val logger = Logger.getLogger(getClass)
+    val brokerInfoList = config.brokerInfo.split("=")
+    val props = new Properties()
+    if (brokerInfoList(0) == "zk.connect")
+      props.put("zk.connect", brokerInfoList(1))
+    else
+      props.put("broker.list", brokerInfoList(1))
+
+    props.put("compression.codec", config.compressionCodec.codec.toString)
+    props.put("producer.type","async")
+    props.put("batch.size", config.batchSize.toString)
+    props.put("reconnect.interval", Integer.MAX_VALUE.toString)
+    props.put("buffer.size", (64*1024).toString)
+
+    logger.info("Producer properties = " + props.toString)
+
+    val producerConfig = new ProducerConfig(props)
+    val producer = new Producer[String, String](producerConfig, new StringEncoder,
+      new DefaultEventHandler[String](producerConfig, null), null, new DefaultPartitioner[String])
+
+    override def run {
+      var bytesSent = 0L
+      var lastBytesSent = 0L
+      var nSends = 0
+      var lastNSends = 0
+      var message = getStringOfLength(config.messageSize)
+      var reportTime = System.currentTimeMillis()
+      var lastReportTime = reportTime
+      val messagesPerThread = config.numMessages / config.numThreads
+      logger.info("Messages per thread = " + messagesPerThread)
+      for(j <- 0 until messagesPerThread) {
+        var strLength = config.messageSize
+        if (!config.isFixSize) {
+            strLength = rand.nextInt(config.messageSize)
+            message = getStringOfLength(strLength)
+            bytesSent += strLength
+        }else
+          bytesSent += config.messageSize
+        try  {
+          producer.send(new ProducerData[String,String](config.topic, message))
+          if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize == 0)
+            Thread.sleep(config.delayedMSBtwSend)
+          nSends += 1
+        }catch {
+          case e: Exception => e.printStackTrace
+        }
+        if(nSends % config.reportingInterval == 0) {
+          reportTime = System.currentTimeMillis()
+          logger.info("thread " + threadId + ": " + nSends + " messages sent "
+            + (1000.0 * (nSends - lastNSends) / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec "
+            + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec")
+          lastReportTime = reportTime
+          lastBytesSent = bytesSent
+          lastNSends = nSends
+        }
+      }
+      producer.close()
+      totalBytesSent.addAndGet(bytesSent)
+      totalMessagesSent.addAndGet(nSends)
+      allDone.countDown()
+    }
+  }
+
+  class SyncProducerThread(val threadId: Int,
+                           val config: PerfConfig,
+                           val totalBytesSent: AtomicLong,
+                           val totalMessagesSent: AtomicLong,
+                           val allDone: CountDownLatch,
+                           val rand: Random) extends Runnable {
+    val logger = Logger.getLogger(getClass)
+    val props = new Properties()
+    val brokerInfoList = config.brokerInfo.split("=")
+    if (brokerInfoList(0) == "zk.connect")
+      props.put("zk.connect", brokerInfoList(1))
+    else
+      props.put("broker.list", brokerInfoList(1))
+    props.put("compression.codec", config.compressionCodec.toString)
+    props.put("reconnect.interval", Integer.MAX_VALUE.toString)
+    props.put("buffer.size", (64*1024).toString)
+
+    val producerConfig = new ProducerConfig(props)
+    val producer = new Producer[String, String](producerConfig, new StringEncoder,
+      new DefaultEventHandler[String](producerConfig, null), null, new DefaultPartitioner[String])
+
+    override def run {
+      var bytesSent = 0L
+      var lastBytesSent = 0L
+      var nSends = 0
+      var lastNSends = 0
+      val message = getStringOfLength(config.messageSize)
+      var reportTime = System.currentTimeMillis()
+      var lastReportTime = reportTime
+      val messagesPerThread = config.numMessages / config.numThreads / config.batchSize
+      logger.info("Messages per thread = " + messagesPerThread)
+      var messageSet: List[String] = Nil
+      for(k <- 0 until config.batchSize) {
+        messageSet ::= message
+      }
+      for(j <- 0 until messagesPerThread) {
+        var strLength = config.messageSize
+        if (!config.isFixSize) {
+          for(k <- 0 until config.batchSize) {
+            strLength = rand.nextInt(config.messageSize)
+            messageSet ::= getStringOfLength(strLength)
+            bytesSent += strLength
+          }
+        }else
+          bytesSent += config.batchSize*config.messageSize
+        try  {
+          producer.send(new ProducerData[String,String](config.topic, messageSet))
+          if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize == 0)
+            Thread.sleep(config.delayedMSBtwSend)
+          nSends += 1
+        }catch {
+          case e: Exception => e.printStackTrace
+        }
+        if(nSends % config.reportingInterval == 0) {
+          reportTime = System.currentTimeMillis()
+          logger.info("thread " + threadId + ": " + nSends + " messages sent "
+            + (1000.0 * (nSends - lastNSends) * config.batchSize / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec "
+            + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec")
+          lastReportTime = reportTime
+          lastBytesSent = bytesSent
+          lastNSends = nSends
+        }
+      }
+      producer.close()
+      totalBytesSent.addAndGet(bytesSent)
+      totalMessagesSent.addAndGet(nSends*config.batchSize)
+      allDone.countDown()
+    }
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.net.URI
+import java.io._
+import joptsimple._
+import kafka.message._
+import kafka.producer._
+import java.util.Properties
+
+/**
+ * Interactive shell for producing messages from the command line
+ */
+object ProducerShell {
+
+  def main(args: Array[String]) {
+    
+    val parser = new OptionParser
+    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
+                           .withRequiredArg
+                           .describedAs("kafka://hostname:port")
+                           .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to produce to.")
+                           .withRequiredArg
+                           .describedAs("topic")
+                           .ofType(classOf[String])
+    
+    val options = parser.parse(args : _*)
+    
+    for(arg <- List(urlOpt, topicOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"") 
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    
+    val url = new URI(options.valueOf(urlOpt))
+    val topic = options.valueOf(topicOpt)
+    val props = new Properties()
+    props.put("host", url.getHost)
+    props.put("port", url.getPort.toString)
+    props.put("buffer.size", "65536")
+    props.put("connect.timeout.ms", "10000")
+    props.put("reconnect.interval", "100")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+
+    val input = new BufferedReader(new InputStreamReader(System.in))
+    var done = false
+    while(!done) {
+      val line = input.readLine()
+      if(line == null) {
+        done = true
+      } else {
+        val lineBytes = line.trim.getBytes()
+        val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(lineBytes))
+        producer.send(topic, messageList)
+        println("Sent: %d (%ld) bytes)".format(line, messageList.sizeInBytes))
+      }
+    }
+    producer.close()
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,197 @@
+package kafka.tools
+
+import java.io.File
+import joptsimple.OptionParser
+import org.apache.log4j.Logger
+import java.util.concurrent.{Executors, CountDownLatch}
+import java.util.Properties
+import kafka.producer.async.DefaultEventHandler
+import kafka.serializer.{DefaultEncoder, StringEncoder}
+import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
+import kafka.consumer._
+import kafka.utils.{StringSerializer, Utils}
+import kafka.api.OffsetRequest
+import org.I0Itec.zkclient._
+import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
+
+object ReplayLogProducer {
+
+  private val GROUPID: String = "replay-log-producer"
+  private val logger = Logger.getLogger(getClass)
+
+  def main(args: Array[String]) {
+    var isNoPrint = false;
+
+    val config = new Config(args)
+
+    val executor = Executors.newFixedThreadPool(config.numThreads)
+    val allDone = new CountDownLatch(config.numThreads)
+
+    // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
+    tryCleanupZookeeper(config.zkConnect, GROUPID)
+    Thread.sleep(500)
+
+    // consumer properties
+    val consumerProps = new Properties
+    consumerProps.put("groupid", GROUPID)
+    consumerProps.put("zk.connect", config.zkConnect)
+    consumerProps.put("consumer.timeout.ms", "10000")
+    consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString)
+    consumerProps.put("fetch.size", (1024*1024).toString)
+    consumerProps.put("socket.buffer.size", (2 * 1024 * 1024).toString)
+    val consumerConfig = new ConsumerConfig(consumerProps)
+    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
+    var threadList = List[ZKConsumerThread]()
+    for ((topic, streamList) <- topicMessageStreams)
+      for (stream <- streamList)
+        threadList ::= new ZKConsumerThread(config, stream)
+
+    for (thread <- threadList)
+      thread.start
+
+    threadList.foreach(_.shutdown)
+    consumerConnector.shutdown
+  }
+
+  class Config(args: Array[String]) {
+    val parser = new OptionParser
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+      "Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("zookeeper url")
+      .ofType(classOf[String])
+      .defaultsTo("127.0.0.1:2181")
+    val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
+      .withRequiredArg
+      .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
+      .ofType(classOf[String])
+    val inputTopicOpt = parser.accepts("inputtopic", "REQUIRED: The topic to consume from.")
+      .withRequiredArg
+      .describedAs("input-topic")
+      .ofType(classOf[String])
+    val outputTopicOpt = parser.accepts("outputtopic", "REQUIRED: The topic to produce to")
+      .withRequiredArg
+      .describedAs("output-topic")
+      .ofType(classOf[String])
+    val numMessagesOpt = parser.accepts("messages", "The number of messages to send.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(-1)
+    val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
+    val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2 batch sends.")
+      .withRequiredArg
+      .describedAs("ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(0)
+    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
+      .withRequiredArg
+      .describedAs("batch size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(200)
+    val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
+      .withRequiredArg
+      .describedAs("threads")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+    val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(5000)
+    val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
+      .withRequiredArg
+      .describedAs("compression codec ")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+
+    val options = parser.parse(args : _*)
+    for(arg <- List(brokerInfoOpt, inputTopicOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val brokerInfo = options.valueOf(brokerInfoOpt)
+    val numMessages = options.valueOf(numMessagesOpt).intValue
+    val isAsync = options.has(asyncOpt)
+    val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
+    var batchSize = options.valueOf(batchSizeOpt).intValue
+    val numThreads = options.valueOf(numThreadsOpt).intValue
+    val inputTopic = options.valueOf(inputTopicOpt)
+    val outputTopic = options.valueOf(outputTopicOpt)
+    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+  }
+
+  def tryCleanupZookeeper(zkUrl: String, groupId: String) {
+    try {
+      val dir = "/consumers/" + groupId
+      logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
+      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
+      zk.deleteRecursive(dir)
+      zk.close()
+    } catch {
+      case _ => // swallow
+    }
+  }
+
+  class ZKConsumerThread(config: Config, stream: KafkaMessageStream) extends Thread {
+    val shutdownLatch = new CountDownLatch(1)
+    val logger = Logger.getLogger(getClass)
+    val props = new Properties()
+    val brokerInfoList = config.brokerInfo.split("=")
+    if (brokerInfoList(0) == "zk.connect")
+      props.put("zk.connect", brokerInfoList(1))
+    else
+      props.put("broker.list", brokerInfoList(1))
+    props.put("reconnect.interval", Integer.MAX_VALUE.toString)
+    props.put("buffer.size", (64*1024).toString)
+    props.put("compression.codec", config.compressionCodec.codec.toString)
+    props.put("batch.size", config.batchSize.toString)
+
+    if(config.isAsync)
+      props.put("producer.type", "async")
+
+    val producerConfig = new ProducerConfig(props)
+    val producer = new Producer[Message, Message](producerConfig, new DefaultEncoder,
+                                                  new DefaultEventHandler[Message](producerConfig, null),
+                                                  null, new DefaultPartitioner[Message])
+
+    override def run() {
+      logger.info("Starting consumer thread..")
+      var messageCount: Int = 0
+      try {
+        val iter =
+          if(config.numMessages >= 0)
+            stream.slice(0, config.numMessages)
+          else
+            stream
+        for (message <- iter) {
+          try {
+            producer.send(new ProducerData[Message, Message](config.outputTopic, message))
+            if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
+              Thread.sleep(config.delayedMSBtwSend)
+            messageCount += 1
+          }catch {
+            case ie: Exception => logger.error("Skipping this message", ie)
+          }
+        }
+      }catch {
+        case e: ConsumerTimeoutException => logger.error("consumer thread timing out", e)
+      }
+      logger.info("Sent " + messageCount + " messages")
+      shutdownLatch.countDown
+      logger.info("thread finished execution !" )
+    }
+
+    def shutdown() {
+      shutdownLatch.await
+      producer.close
+    }
+
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.net.URI
+import joptsimple._
+import kafka.api.FetchRequest
+import kafka.utils._
+import kafka.server._
+import kafka.consumer.SimpleConsumer
+
+/**
+ * Performance test for the simple consumer
+ */
+object SimpleConsumerPerformance {
+
+  def main(args: Array[String]) {
+    
+    val parser = new OptionParser
+    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
+                           .withRequiredArg
+                           .describedAs("kafka://hostname:port")
+                           .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+                           .withRequiredArg
+                           .describedAs("topic")
+                           .ofType(classOf[String])
+    val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.")
+                           .withRequiredArg
+                           .describedAs("bytes")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1024*1024)
+    
+    val options = parser.parse(args : _*)
+    
+    for(arg <- List(urlOpt, topicOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"") 
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    
+    val url = new URI(options.valueOf(urlOpt))
+    val topic = options.valueOf(topicOpt)
+    val fetchSize = options.valueOf(fetchSizeOpt).intValue
+    
+    val consumer = new SimpleConsumer(url.getHost, url.getPort, 30*1000, 2*fetchSize)
+    val startMs = System.currentTimeMillis
+    var done = false
+    var totalRead = 0
+    val reportingInterval = 100000
+    var consumedInInterval = 0
+    var offset: Long = 0L
+    while(!done) {
+      val messages = consumer.fetch(new FetchRequest(topic, 0, offset, fetchSize))
+      var messagesRead = 0
+      for(message <- messages)
+        messagesRead += 1
+      
+      if(messagesRead == 0)
+        done = true
+      else
+        offset += messages.validBytes
+      
+      totalRead += messagesRead
+      consumedInInterval += messagesRead
+      
+      if(consumedInInterval > reportingInterval) {
+        println("Bytes read: " + totalRead)
+        consumedInInterval = 0
+      }
+    }
+    val ellapsedSeconds = (System.currentTimeMillis - startMs) / 1000.0
+    println(totalRead + " messages read, " + offset + " bytes")
+    println("Messages/sec: " + totalRead / ellapsedSeconds)
+    println("MB/sec: " + offset / ellapsedSeconds / (1024.0*1024.0))
+    System.exit(0)
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.net.URI
+import joptsimple._
+import kafka.api.FetchRequest
+import kafka.utils._
+import kafka.consumer._
+import kafka.server._
+
+/**
+ * Command line program to dump out messages to standard out using the simple consumer
+ */
+object SimpleConsumerShell {
+
+  def main(args: Array[String]): Unit = {
+    
+    val parser = new OptionParser
+    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
+                           .withRequiredArg
+                           .describedAs("kafka://hostname:port")
+                           .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+                           .withRequiredArg
+                           .describedAs("topic")
+                           .ofType(classOf[String])
+    val partitionOpt = parser.accepts("partition", "The partition to consume from.")
+                           .withRequiredArg
+                           .describedAs("partition")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(0)
+    val offsetOpt = parser.accepts("offset", "The offset to start consuming from.")
+                           .withRequiredArg
+                           .describedAs("offset")
+                           .ofType(classOf[java.lang.Long])
+                           .defaultsTo(0L)
+    val fetchsizeOpt = parser.accepts("fetchsize", "The fetch size of each request.")
+                           .withRequiredArg
+                           .describedAs("fetchsize")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1000000)
+
+    val options = parser.parse(args : _*)
+    
+    for(arg <- List(urlOpt, topicOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"") 
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val url = new URI(options.valueOf(urlOpt))
+    val topic = options.valueOf(topicOpt)
+    val partition = options.valueOf(partitionOpt).intValue
+    val startingOffset = options.valueOf(offsetOpt).longValue
+    val fetchsize = options.valueOf(fetchsizeOpt).intValue
+
+    println("Starting consumer...")
+    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
+    val thread = Utils.newThread("kafka-consumer", new Runnable() {
+      def run() {
+        var offset = startingOffset
+        while(true) {
+	      val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
+	      val messageSets = consumer.multifetch(fetchRequest)
+	      for (messages <- messageSets) {
+	        println("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
+            var consumed = 0
+            for(messageAndOffset <- messages) {
+              println("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+              consumed += 1
+	        }
+	        if(consumed > 0)
+	          offset += messages.validBytes
+          }
+          Thread.sleep(10000)
+        }
+      }  
+    }, false);
+    thread.start()
+    thread.join()
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Annotations.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Annotations.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Annotations.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Annotations.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+/* Some helpful annotations */
+
+/**
+ * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation 
+ * must respect
+ */
+class threadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is not threadsafe
+ */
+class nonthreadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is immutable
+ */
+class immutable extends StaticAnnotation

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/DelayedItem.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/DelayedItem.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/DelayedItem.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/DelayedItem.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent._
+import scala.math._
+
+class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed {
+  
+  val delayMs = unit.toMillis(delay)
+  val createdMs = System.currentTimeMillis
+  
+  def this(item: T, delayMs: Long) = 
+    this(item, delayMs, TimeUnit.MILLISECONDS)
+
+  /**
+   * The remaining delay time
+   */
+  def getDelay(unit: TimeUnit): Long = {
+    val ellapsedMs = (System.currentTimeMillis - createdMs)
+    unit.convert(max(delayMs - ellapsedMs, 0), unit)
+  }
+    
+  def compareTo(d: Delayed): Int = {
+    val delayed = d.asInstanceOf[DelayedItem[T]]
+    val myEnd = createdMs + delayMs
+    val yourEnd = delayed.createdMs - delayed.delayMs
+    
+    if(myEnd < yourEnd) -1
+    else if(myEnd > yourEnd) 1
+    else 0 
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.io._
+import kafka.message._
+import kafka.utils._
+
+object DumpLogSegments {
+
+  def main(args: Array[String]) {
+    var isNoPrint = false;
+    for(arg <- args)
+      if ("-noprint".compareToIgnoreCase(arg) == 0)
+        isNoPrint = true;
+
+    for(arg <- args) {
+      if (! ("-noprint".compareToIgnoreCase(arg) == 0) ) {
+        val file = new File(arg)
+        println("Dumping " + file)
+        var offset = file.getName().split("\\.")(0).toLong
+        println("Starting offset: " + offset)
+        val messageSet = new FileMessageSet(file, false)
+        for(messageAndOffset <- messageSet) {
+          println("----------------------------------------------")
+         if (messageAndOffset.message.isValid)
+            println("offset:\t" + offset)
+          else
+            println("offset:\t %d \t invalid".format(offset))
+          if (!isNoPrint)
+            println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+          offset += messageAndOffset.offset
+        }
+      }
+    }
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+class State
+object DONE extends State
+object READY extends State
+object NOT_READY extends State
+object FAILED extends State
+
+/**
+ * Transliteration of the iterator template in google collections. To implement an iterator
+ * override makeNext and call allDone() when there is no more items	
+ */
+abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] {
+  
+  private var state: State = NOT_READY
+  private var nextItem: Option[T] = None
+
+  def next(): T = {
+    if(!hasNext())
+      throw new NoSuchElementException()
+    state = NOT_READY
+    nextItem match {
+      case Some(item) => item
+      case None => throw new IllegalStateException("Expected item but none found.")
+    }
+  }
+  
+  def hasNext(): Boolean = {
+    if(state == FAILED)
+      throw new IllegalStateException("Iterator is in failed state")
+    state match {
+      case DONE => false
+      case READY => true
+      case _ => maybeComputeNext()
+    }
+  }
+  
+  protected def makeNext(): T
+  
+  def maybeComputeNext(): Boolean = {
+    state = FAILED
+    nextItem = Some(makeNext())
+    if(state == DONE) {
+      false
+    } else {
+      state = READY
+      true
+    }
+  }
+  
+  protected def allDone(): T = {
+    state = DONE
+    null.asInstanceOf[T]
+  }
+  
+  def remove = 
+    throw new UnsupportedOperationException("Removal not supported")
+  
+}
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import kafka.utils._
+import org.apache.log4j.Logger
+
+/**
+ * A scheduler for running jobs in the background
+ * TODO: ScheduledThreadPoolExecutor notriously swallows exceptions
+ */
+class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) {
+  private val logger = Logger.getLogger(getClass())
+  private val threadId = new AtomicLong(0)
+  private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
+    def newThread(runnable: Runnable): Thread = {
+      val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement)
+      t.setDaemon(isDaemon)
+      t
+    }
+  })
+  
+  def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) =
+    executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
+
+  def shutdown() = {
+    executor.shutdownNow
+    logger.info("shutdown scheduler " + baseThreadName)
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent._
+
+class MockTime(@volatile var currentMs: Long) extends Time {
+  
+  def this() = this(System.currentTimeMillis)
+  
+  def milliseconds: Long = currentMs
+
+  def nanoseconds: Long = 
+    TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
+
+  def sleep(ms: Long): Unit = 
+    currentMs += ms
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Pool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Pool.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Pool.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Pool.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.ArrayList
+import java.util.concurrent._
+import collection.JavaConversions
+
+class Pool[K,V] extends Iterable[(K, V)] {
+
+  private val pool = new ConcurrentHashMap[K, V]
+  
+  def this(m: collection.Map[K, V]) {
+    this()
+    for((k,v) <- m.elements)
+      pool.put(k, v)
+  }
+  
+  def put(k: K, v: V) = pool.put(k, v)
+  
+  def putIfNotExists(k: K, v: V) = pool.putIfAbsent(k, v)
+  
+  def contains(id: K) = pool.containsKey(id)
+  
+  def get(key: K): V = pool.get(key)
+  
+  def remove(key: K): V = pool.remove(key)
+  
+  def keys = JavaConversions.asSet(pool.keySet())
+  
+  def values: Iterable[V] = 
+    JavaConversions.asIterable(new ArrayList[V](pool.values()))
+  
+  def clear: Unit = pool.clear()
+  
+  override def size = pool.size
+  
+  override def iterator = new Iterator[(K,V)]() {
+    
+    private val iter = pool.entrySet.iterator
+    
+    def hasNext: Boolean = iter.hasNext
+    
+    def next: (K, V) = {
+      val n = iter.next
+      (n.getKey, n.getValue)
+    }
+    
+  }
+    
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import scala.math._
+
+/** 
+ * A generic range value with a start and end 
+ */
+trait Range {
+  /** The first index in the range */
+  def start: Long
+  /** The total number of indexes in the range */
+  def size: Long
+  /** Return true iff the range is empty */
+  def isEmpty: Boolean = size == 0
+
+  /** if value is in range */
+  def contains(value: Long): Boolean = {
+    if( (size == 0 && value == start) ||
+        (size > 0 && value >= start && value <= start + size - 1) )
+      return true
+    else
+      return false
+  }
+  
+  override def toString() = "(start=" + start + ", size=" + size + ")"
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils;
+
+import org.apache.log4j.Logger
+import scala.math._
+
+object Throttler {
+  val logger = Logger.getLogger(classOf[Throttler])
+  val DefaultCheckIntervalMs = 100L
+}
+
+/**
+ * A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second
+ * (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for 
+ * an appropraite amount of time when maybeThrottle() is called to attain the desired rate.
+ * 
+ * @param desiredRatePerSec: The rate we want to hit in units/sec
+ * @param checkIntervalMs: The interval at which to check our rate
+ * @param throttleDown: Does throttling increase or decrease our rate?
+ * @param time: The time implementation to use
+ */
+@nonthreadsafe
+class Throttler(val desiredRatePerSec: Double, 
+                val checkIntervalMs: Long, 
+                val throttleDown: Boolean, 
+                val time: Time) {
+  
+  private val lock = new Object
+  private var periodStartNs: Long = time.nanoseconds
+  private var observedSoFar: Double = 0.0
+  
+  def this(desiredRatePerSec: Double, throttleDown: Boolean) = 
+    this(desiredRatePerSec, Throttler.DefaultCheckIntervalMs, throttleDown, SystemTime)
+
+  def this(desiredRatePerSec: Double) = 
+    this(desiredRatePerSec, Throttler.DefaultCheckIntervalMs, true, SystemTime)
+  
+  def maybeThrottle(observed: Double) {
+    lock synchronized {
+      observedSoFar += observed
+      val now = time.nanoseconds
+      val ellapsedNs = now - periodStartNs
+      // if we have completed an interval AND we have observed something, maybe
+      // we should take a little nap
+      if(ellapsedNs > checkIntervalMs * Time.NsPerMs && observedSoFar > 0) {
+        val rateInSecs = (observedSoFar * Time.NsPerSec) / ellapsedNs
+        val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
+        if(needAdjustment) {
+          // solve for the amount of time to sleep to make us hit the desired rate
+          val desiredRateMs = desiredRatePerSec / Time.MsPerSec.asInstanceOf[Double]
+          val ellapsedMs = ellapsedNs / Time.NsPerMs
+          val sleepTime = round(observedSoFar / desiredRateMs - ellapsedMs)
+          if(sleepTime > 0) {
+            if(Throttler.logger.isDebugEnabled)
+              Throttler.logger.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec + 
+                                     ", sleeping for " + sleepTime + " ms to compensate.")
+            time.sleep(sleepTime)
+          }
+        }
+        periodStartNs = now
+        observedSoFar = 0
+      }
+    }
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Time.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Time.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Time.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Time.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+/**
+ * Some common constants
+ */
+object Time {
+  val NsPerUs = 1000
+  val UsPerMs = 1000
+  val MsPerSec = 1000
+  val NsPerMs = NsPerUs * UsPerMs
+  val NsPerSec = NsPerMs * MsPerSec
+  val UsPerSec = UsPerMs * MsPerSec
+  val SecsPerMin = 60
+  val MinsPerHour = 60
+  val HoursPerDay = 24
+  val SecsPerHour = SecsPerMin * MinsPerHour
+  val SecsPerDay = SecsPerHour * HoursPerDay
+  val MinsPerDay = MinsPerHour * HoursPerDay
+}
+
+/**
+ * A mockable interface for time functions
+ */
+trait Time {
+  
+  def milliseconds: Long
+
+  def nanoseconds: Long
+
+  def sleep(ms: Long)
+}
+
+/**
+ * The normal system implementation of time functions
+ */
+object SystemTime extends Time {
+  
+  def milliseconds: Long = System.currentTimeMillis
+  
+  def nanoseconds: Long = System.nanoTime
+  
+  def sleep(ms: Long): Unit = Thread.sleep(ms)
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.consumer.{SimpleConsumer, ConsumerConfig}
+import kafka.cluster.Partition
+import kafka.api.OffsetRequest
+
+/**
+ *  A utility that updates the offset of every broker partition to the offset of latest log segment file, in ZK.
+ */
+object UpdateOffsetsInZK {
+  val Earliest = "earliest"
+  val Latest = "latest"
+
+  def main(args: Array[String]) {
+    if(args.length < 3)
+      usage
+    val config = new ConsumerConfig(Utils.loadProps(args(1)))
+    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
+        config.zkConnectionTimeoutMs, StringSerializer)
+    args(0) match {
+      case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
+      case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))
+      case _ => usage
+    }
+  }
+
+  private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
+    val cluster = ZkUtils.getCluster(zkClient)
+    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator)
+    var partitions: List[String] = Nil
+
+    partitionsPerTopicMap.get(topic) match {
+      case Some(l) =>  partitions = l.sortWith((s,t) => s < t)
+      case _ => throw new RuntimeException("Can't find topic " + topic)
+    }
+
+    var numParts = 0
+    for (partString <- partitions) {
+      val part = Partition.parse(partString)
+      val broker = cluster.getBroker(part.brokerId)
+      val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100 * 1024)
+      val offsets = consumer.getOffsetsBefore(topic, part.partId, offsetOption, 1)
+      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+      
+      println("updating partition " + part.name + " with new offset: " + offsets(0))
+      ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + part.name, offsets(0).toString)
+      numParts += 1
+    }
+    println("updated the offset for " + numParts + " partitions")    
+  }
+
+  private def usage() = {
+    println("USAGE: " + UpdateOffsetsInZK.getClass.getName + " [earliest | latest] consumer.properties topic")
+    System.exit(1)
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,684 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.io._
+import java.nio._
+import java.nio.channels._
+import java.util.concurrent.atomic._
+import java.lang.management._
+import java.util.zip.CRC32
+import org.apache.log4j.Logger
+import javax.management._
+import java.util.Properties
+import scala.collection._
+import scala.collection.mutable
+import kafka.message.{NoCompressionCodec, CompressionCodec}
+
+/**
+ * Helper functions!
+ */
+object Utils {
+  private val logger = Logger.getLogger(getClass())
+  
+  /**
+   * Wrap the given function in a java.lang.Runnable
+   * @param fun A function
+   * @return A Runnable that just executes the function
+   */
+  def runnable(fun: () => Unit): Runnable = 
+    new Runnable() {
+      def run() = fun()
+    }
+  
+  /**
+   * Wrap the given function in a java.lang.Runnable that logs any errors encountered
+   * @param fun A function
+   * @return A Runnable that just executes the function
+   */
+  def loggedRunnable(fun: () => Unit): Runnable =
+    new Runnable() {
+      def run() = {
+        try {
+          fun()
+        }
+        catch {
+          case t =>
+            // log any error and the stack trace
+            logger.error(t, t)
+            logger.error(stackTrace(t), t)
+        }
+      }
+    }
+
+  /**
+   * Create a daemon thread
+   * @param name The name of the thread
+   * @param runnable The runnable to execute in the background
+   * @return The unstarted thread
+   */
+  def daemonThread(name: String, runnable: Runnable): Thread = 
+    newThread(name, runnable, true)
+  
+  /**
+   * Create a daemon thread
+   * @param name The name of the thread
+   * @param fun The runction to execute in the thread
+   * @return The unstarted thread
+   */
+  def daemonThread(name: String, fun: () => Unit): Thread = 
+    daemonThread(name, runnable(fun))
+  
+  /**
+   * Create a new thread
+   * @param name The name of the thread
+   * @param runnable The work for the thread to do
+   * @param daemon Should the thread block JVM shutdown?
+   * @return The unstarted thread
+   */
+  def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = {
+    val thread = new Thread(runnable, name) 
+    thread.setDaemon(daemon)
+    thread
+  }
+   
+  /**
+   * Read a byte array from the given offset and size in the buffer
+   * TODO: Should use System.arraycopy
+   */
+  def readBytes(buffer: ByteBuffer, offset: Int, size: Int): Array[Byte] = {
+    val bytes = new Array[Byte](size)
+    var i = 0
+    while(i < size) {
+      bytes(i) = buffer.get(offset + i)
+      i += 1
+    }
+    bytes
+  }
+  
+  /**
+   * Read size prefixed string where the size is stored as a 2 byte short.
+   * @param buffer The buffer to read from
+   * @param encoding The encoding in which to read the string
+   */
+  def readShortString(buffer: ByteBuffer, encoding: String): String = {
+    val size: Int = buffer.getShort()
+    if(size < 0)
+      return null
+    val bytes = new Array[Byte](size)
+    buffer.get(bytes)
+    new String(bytes, encoding)
+  }
+  
+  /**
+   * Write a size prefixed string where the size is stored as a 2 byte short
+   * @param buffer The buffer to write to
+   * @param string The string to write
+   * @param encoding The encoding in which to write the string
+   */
+  def writeShortString(buffer: ByteBuffer, string: String, encoding: String): Unit = {
+    if(string == null) {
+      buffer.putShort(-1)
+    } else if(string.length > Short.MaxValue) {
+      throw new IllegalArgumentException("String exceeds the maximum size of " + Short.MaxValue + ".")
+    } else {
+      buffer.putShort(string.length.asInstanceOf[Short])
+      buffer.put(string.getBytes(encoding))
+    }
+  }
+  
+  /**
+   * Read a properties file from the given path
+   * @param filename The path of the file to read
+   */
+  def loadProps(filename: String): Properties = {
+    val propStream = new FileInputStream(filename)
+    val props = new Properties()
+    props.load(propStream)
+    props
+  }
+  
+  /**
+   * Read a required integer property value or throw an exception if no such property is found
+   */
+  def getInt(props: Properties, name: String): Int = {
+    if(props.containsKey(name))
+      return getInt(props, name, -1)
+    else
+      throw new IllegalArgumentException("Missing required property '" + name + "'")
+  }
+  
+  /**
+   * Read an integer from the properties instance
+   * @param props The properties to read from
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @return the integer value
+   */
+  def getInt(props: Properties, name: String, default: Int): Int = 
+    getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue))
+  
+  /**
+   * Read an integer from the properties instance. Throw an exception 
+   * if the value is not in the given range (inclusive)
+   * @param props The properties to read from
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @param range The range in which the value must fall (inclusive)
+   * @throws IllegalArgumentException If the value is not in the given range
+   * @return the integer value
+   */
+  def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = {
+    val v = 
+      if(props.containsKey(name))
+        props.getProperty(name).toInt
+      else
+        default
+    if(v < range._1 || v > range._2)
+      throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+    else
+      v
+  }
+  
+  /**
+   * Read a boolean value from the properties instance
+   * @param props The properties to read from
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @return the boolean value
+   */
+  def getBoolean(props: Properties, name: String, default: Boolean): Boolean = {
+    if(!props.containsKey(name))
+      default
+    else if("true" == props.getProperty(name))
+      true
+    else if("false" == props.getProperty(name))
+      false
+    else
+      throw new IllegalArgumentException("Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false" )
+  }
+  
+  /**
+   * Get a string property, or, if no such property is defined, return the given default value
+   */
+  def getString(props: Properties, name: String, default: String): String = {
+    if(props.containsKey(name))
+      props.getProperty(name)
+    else
+      default
+  }
+  
+  /**
+   * Get a string property or throw and exception if no such property is defined.
+   */
+  def getString(props: Properties, name: String): String = {
+    if(props.containsKey(name))
+      props.getProperty(name)
+    else
+      throw new IllegalArgumentException("Missing required property '" + name + "'")
+  }
+
+  /**
+   * Get a property of type java.util.Properties or throw and exception if no such property is defined.
+   */
+  def getProps(props: Properties, name: String): Properties = {
+    if(props.containsKey(name)) {
+      val propString = props.getProperty(name)
+      val propValues = propString.split(",")
+      val properties = new Properties
+      for(i <- 0 until propValues.length) {
+        val prop = propValues(i).split("=")
+        if(prop.length != 2)
+          throw new IllegalArgumentException("Illegal format of specifying properties '" + propValues(i) + "'")
+        properties.put(prop(0), prop(1))
+      }
+      properties
+    }
+    else
+      throw new IllegalArgumentException("Missing required property '" + name + "'")
+  }
+
+  /**
+   * Get a property of type java.util.Properties or return the default if no such property is defined
+   */
+  def getProps(props: Properties, name: String, default: Properties): Properties = {
+    if(props.containsKey(name)) {
+      val propString = props.getProperty(name)
+      val propValues = propString.split(",")
+      if(propValues.length < 1)
+        throw new IllegalArgumentException("Illegal format of specifying properties '" + propString + "'")
+      val properties = new Properties
+      for(i <- 0 until propValues.length) {
+        val prop = propValues(i).split("=")
+        if(prop.length != 2)
+          throw new IllegalArgumentException("Illegal format of specifying properties '" + propValues(i) + "'")
+        properties.put(prop(0), prop(1))
+      }
+      properties
+    }
+    else
+      default
+  }
+
+  /**
+   * Open a channel for the given file
+   */
+  def openChannel(file: File, mutable: Boolean): FileChannel = {
+    if(mutable)
+      new RandomAccessFile(file, "rw").getChannel()
+    else
+      new FileInputStream(file).getChannel()
+  }
+  
+  /**
+   * Do the given action and log any exceptions thrown without rethrowing them
+   * @param log The log method to use for logging. E.g. logger.warn
+   * @param action The action to execute
+   */
+  def swallow(log: (Object, Throwable) => Unit, action: => Unit) = {
+    try {
+      action
+    } catch {
+      case e: Throwable => log(e.getMessage(), e)
+    }
+  }
+  
+  /**
+   * Test if two byte buffers are equal. In this case equality means having
+   * the same bytes from the current position to the limit
+   */
+  def equal(b1: ByteBuffer, b2: ByteBuffer): Boolean = {
+    // two byte buffers are equal if their position is the same,
+    // their remaining bytes are the same, and their contents are the same
+    if(b1.position != b2.position)
+      return false
+    if(b1.remaining != b2.remaining)
+      return false
+    for(i <- 0 until b1.remaining)
+      if(b1.get(i) != b2.get(i))
+        return false
+    return true
+  }
+  
+  /**
+   * Translate the given buffer into a string
+   * @param buffer The buffer to translate
+   * @param encoding The encoding to use in translating bytes to characters
+   */
+  def toString(buffer: ByteBuffer, encoding: String): String = {
+    val bytes = new Array[Byte](buffer.remaining)
+    buffer.get(bytes)
+    new String(bytes, encoding)
+  }
+  
+  /**
+   * Print an error message and shutdown the JVM
+   * @param message The error message
+   */
+  def croak(message: String) {
+    System.err.println(message)
+    System.exit(1)
+  }
+  
+  /**
+   * Recursively delete the given file/directory and any subfiles (if any exist)
+   * @param file The root file at which to begin deleting
+   */
+  def rm(file: String): Unit = rm(new File(file))
+  
+  /**
+   * Recursively delete the given file/directory and any subfiles (if any exist)
+   * @param file The root file at which to begin deleting
+   */
+  def rm(file: File): Unit = {
+    if(file == null) {
+      return
+    } else if(file.isDirectory) {
+      val files = file.listFiles()
+      if(files != null) {
+        for(f <- files)
+          rm(f)
+      }
+      file.delete()
+    } else {
+      file.delete()
+    }
+  }
+  
+  /**
+   * Register the given mbean with the platform mbean server,
+   * unregistering any mbean that was there before
+   * @param mbean The object to register as an mbean
+   * @param name The name to register this mbean with
+   */
+  def registerMBean(mbean: Object, name: String) {
+    val mbs = ManagementFactory.getPlatformMBeanServer()
+    mbs synchronized {
+      val objName = new ObjectName(name)
+      if(mbs.isRegistered(objName))
+        mbs.unregisterMBean(objName)
+      mbs.registerMBean(mbean, objName)
+    }
+  }
+  
+  /**
+   * Unregister the mbean with the given name, if there is one registered
+   * @param name The mbean name to unregister
+   */
+  def unregisterMBean(name: String) {
+    val mbs = ManagementFactory.getPlatformMBeanServer()
+    mbs synchronized {
+      val objName = new ObjectName(name)
+      if(mbs.isRegistered(objName))
+        mbs.unregisterMBean(objName)
+    }
+  }
+  
+  /**
+   * Read an unsigned integer from the current position in the buffer, 
+   * incrementing the position by 4 bytes
+   * @param The buffer to read from
+   * @return The integer read, as a long to avoid signedness
+   */
+  def getUnsignedInt(buffer: ByteBuffer): Long = 
+    buffer.getInt() & 0xffffffffL
+  
+  /**
+   * Read an unsigned integer from the given position without modifying the buffers
+   * position
+   * @param The buffer to read from
+   * @param index the index from which to read the integer
+   * @return The integer read, as a long to avoid signedness
+   */
+  def getUnsignedInt(buffer: ByteBuffer, index: Int): Long = 
+    buffer.getInt(index) & 0xffffffffL
+  
+  /**
+   * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+   * @param buffer The buffer to write to
+   * @param value The value to write
+   */
+  def putUnsignedInt(buffer: ByteBuffer, value: Long): Unit = 
+    buffer.putInt((value & 0xffffffffL).asInstanceOf[Int])
+  
+  /**
+   * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+   * @param buffer The buffer to write to
+   * @param index The position in the buffer at which to begin writing
+   * @param value The value to write
+   */
+  def putUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = 
+    buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int])
+  
+  /**
+   * Compute the CRC32 of the byte array
+   * @param bytes The array to compute the checksum for
+   * @return The CRC32
+   */
+  def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length)
+  
+  /**
+   * Compute the CRC32 of the segment of the byte array given by the specificed size and offset
+   * @param bytes The bytes to checksum
+   * @param the offset at which to begin checksumming
+   * @param the number of bytes to checksum
+   * @return The CRC32
+   */
+  def crc32(bytes: Array[Byte], offset: Int, size: Int): Long = {
+    val crc = new CRC32()
+    crc.update(bytes, offset, size)
+    crc.getValue()
+  }
+  
+  /**
+   * Compute the hash code for the given items
+   */
+  def hashcode(as: Any*): Int = {
+    if(as == null)
+      return 0
+    var h = 1
+    var i = 0
+    while(i < as.length) {
+      if(as(i) != null) {
+        h = 31 * h + as(i).hashCode
+        i += 1
+      }
+    }
+    return h
+  }
+  
+  /**
+   * Group the given values by keys extracted with the given function
+   */
+  def groupby[K,V](vals: Iterable[V], f: V => K): Map[K,List[V]] = {
+    val m = new mutable.HashMap[K, List[V]]
+    for(v <- vals) {
+      val k = f(v)
+      m.get(k) match {
+        case Some(l: List[V]) => m.put(k, v :: l)
+        case None => m.put(k, List(v))
+      }
+    } 
+    m
+  }
+  
+  /**
+   * Read some bytes into the provided buffer, and return the number of bytes read. If the 
+   * channel has been closed or we get -1 on the read for any reason, throw an EOFException
+   */
+  def read(channel: ReadableByteChannel, buffer: ByteBuffer): Int = {
+    channel.read(buffer) match {
+      case -1 => throw new EOFException("Received -1 when reading from channel, socket has likely been closed.")
+      case n: Int => n
+    }
+  } 
+  
+  def notNull[V](v: V) = {
+    if(v == null)
+      throw new IllegalArgumentException("Value cannot be null.")
+    else
+      v
+  }
+
+  def getHostPort(hostport: String) : Tuple2[String, Int] = {
+    val splits = hostport.split(":")
+    (splits(0), splits(1).toInt)
+  }
+
+  def getTopicPartition(topicPartition: String) : Tuple2[String, Int] = {
+    val index = topicPartition.lastIndexOf('-')
+    (topicPartition.substring(0,index), topicPartition.substring(index+1).toInt)
+  }
+
+  def stackTrace(e: Throwable): String = {
+    val sw = new StringWriter;
+    val pw = new PrintWriter(sw);
+    e.printStackTrace(pw);
+    sw.toString();
+  }
+
+  /**
+   * This method gets comma seperated values which contains key,value pairs and returns a map of
+   * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
+   */
+  private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = {
+    val map = new mutable.HashMap[K, V]
+    if("".equals(allCSVals))
+      return map
+    val csVals = allCSVals.split(",")
+    for(i <- 0 until csVals.length)
+    {
+     try{
+      val tempSplit = csVals(i).split(":")
+      logger.info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim))
+      map += tempSplit(0).asInstanceOf[K] -> Integer.parseInt(tempSplit(1).trim).asInstanceOf[V]
+      } catch {
+          case _ =>  logger.error(exceptionMsg + ": " + csVals(i))
+        }
+    }
+    map
+  }
+
+  def getCSVList(csvList: String): Seq[String] = {
+    if(csvList == null)
+      Seq.empty[String]
+    else {
+      csvList.split(",").filter(v => !v.equals(""))
+    }
+  }
+
+  def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
+    val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
+    val successMsg =  "The retention hour for "
+    getCSVMap(retentionHours, exceptionMsg, successMsg)
+  }
+
+  def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = {
+    val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: "
+    val successMsg =  "The flush interval for "
+    getCSVMap(allIntervals, exceptionMsg, successMsg)
+  }
+
+  def getTopicPartitions(allPartitions: String) : Map[String, Int] = {
+    val exceptionMsg = "Malformed token for topic.partition.counts in server.properties: "
+    val successMsg =  "The number of partitions for topic  "
+    getCSVMap(allPartitions, exceptionMsg, successMsg)
+  }
+
+  def getConsumerTopicMap(consumerTopicString: String) : Map[String, Int] = {
+    val exceptionMsg = "Malformed token for embeddedconsumer.topics in consumer.properties: "
+    val successMsg =  "The number of consumer thread for topic  "
+    getCSVMap(consumerTopicString, exceptionMsg, successMsg)
+  }
+
+  def getObject[T<:AnyRef](className: String): T = {
+    className match {
+      case null => null.asInstanceOf[T]
+      case _ =>
+        val clazz = Class.forName(className)
+        val clazzT = clazz.asInstanceOf[Class[T]]
+        val constructors = clazzT.getConstructors
+        require(constructors.length == 1)
+        constructors.head.newInstance().asInstanceOf[T]
+    }
+  }
+
+  def propertyExists(prop: String): Boolean = {
+    if(prop == null)
+      false
+    else if(prop.compareTo("") == 0)
+      false
+    else true
+  }
+
+  def getCompressionCodec(props: Properties, codec: String): CompressionCodec = {
+    val codecValueString = props.getProperty(codec)
+    if(codecValueString == null)
+      NoCompressionCodec
+    else
+      CompressionCodec.getCompressionCodec(codecValueString.toInt)
+  }
+}
+
+class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
+  private val time: Time = SystemTime
+
+  private val complete = new AtomicReference(new Stats())
+  private val current = new AtomicReference(new Stats())
+  private val numCumulatedRequests = new AtomicLong(0)
+
+  def recordRequestMetric(requestNs: Long) {
+    val stats = current.get
+    stats.add(requestNs)
+    numCumulatedRequests.getAndAdd(1)
+    val ageNs = time.nanoseconds - stats.start
+    // if the current stats are too old it is time to swap
+    if(ageNs >= monitorDurationNs) {
+      val swapped = current.compareAndSet(stats, new Stats())
+      if(swapped) {
+        complete.set(stats)
+        stats.end.set(time.nanoseconds)
+      }
+    }
+  }
+
+  def recordThroughputMetric(data: Long) {
+    val stats = current.get
+    stats.addData(data)
+    val ageNs = time.nanoseconds - stats.start
+    // if the current stats are too old it is time to swap
+    if(ageNs >= monitorDurationNs) {
+      val swapped = current.compareAndSet(stats, new Stats())
+      if(swapped) {
+        complete.set(stats)
+        stats.end.set(time.nanoseconds)
+      }
+    }
+  }
+
+  def getNumRequests(): Long = numCumulatedRequests.get
+
+  def getRequestsPerSecond: Double = {
+    val stats = complete.get
+    stats.numRequests / stats.durationSeconds
+  }
+
+  def getThroughput: Double = {
+    val stats = complete.get
+    stats.totalData / stats.durationSeconds
+  }
+
+  def getAvgMetric: Double = {
+    val stats = complete.get
+    if (stats.numRequests == 0) {
+      0
+    }
+    else {
+      stats.totalRequestMetric / stats.numRequests
+    }
+  }
+
+  def getMaxMetric: Double = complete.get.maxRequestMetric
+
+  class Stats {
+    val start = time.nanoseconds
+    var end = new AtomicLong(-1)
+    var numRequests = 0
+    var totalRequestMetric: Long = 0L
+    var maxRequestMetric: Long = 0L
+    var totalData: Long = 0L
+    private val lock = new Object()
+
+    def addData(data: Long) {
+      lock synchronized {
+        totalData += data
+      }
+    }
+
+    def add(requestNs: Long) {
+      lock synchronized {
+        numRequests +=1
+        totalRequestMetric += requestNs
+        maxRequestMetric = scala.math.max(maxRequestMetric, requestNs)
+      }
+    }
+
+    def durationSeconds: Double = (end.get - start) / (1000.0 * 1000.0 * 1000.0)
+
+    def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.serialize.ZkSerializer
+import kafka.cluster.{Broker, Cluster}
+import scala.collection._
+import java.util.Properties
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
+import org.apache.log4j.Logger
+
+object ZkUtils {
+  val ConsumersPath = "/consumers"
+  val BrokerIdsPath = "/brokers/ids"
+  val BrokerTopicsPath = "/brokers/topics"
+  private val logger = Logger.getLogger(getClass())  
+
+  /**
+   *  make sure a persistent path exists in ZK. Create the path if not exist.
+   */
+  def makeSurePersistentPathExists(client: ZkClient, path: String) {
+    if (!client.exists(path))
+      client.createPersistent(path, true) // won't throw NoNodeException or NodeExistsException
+  }
+
+  /**
+   *  create the parent path
+   */
+  private def createParentPath(client: ZkClient, path: String): Unit = {
+    val parentDir = path.substring(0, path.lastIndexOf('/'))
+    if (parentDir.length != 0)
+      client.createPersistent(parentDir, true)
+  }
+
+  /**
+   * Create an ephemeral node with the given path and data. Create parents if necessary.
+   */
+  private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
+    try {
+      client.createEphemeral(path, data)
+    }
+    catch {
+      case e: ZkNoNodeException => {
+        createParentPath(client, path)
+        client.createEphemeral(path, data)
+      }
+    }
+  }
+
+  /**
+   * Create an ephemeral node with the given path and data.
+   * Throw NodeExistException if node already exists.
+   */
+  def createEphemeralPathExpectConflict(client: ZkClient, path: String, data: String): Unit = {
+    try {
+      createEphemeralPath(client, path, data)
+    }
+    catch {
+      case e: ZkNodeExistsException => {
+        // this can happen when there is connection loss; make sure the data is what we intend to write
+        var storedData: String = null
+        try {
+          storedData = readData(client, path)
+        }
+        catch {
+          case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
+          case e2 => throw e2
+        }
+        if (storedData == null || storedData != data) {
+          logger.info("conflict in " + path + " data: " + data + " stored data: " + storedData)
+          throw e
+        }
+        else {
+          // otherwise, the creation succeeded, return normally
+          logger.info(path + " exists with value " + data + " during connection loss; this is ok")
+        }
+      }
+      case e2 => throw e2
+    }
+  }
+
+  /**
+   * Update the value of a persistent node with the given path and data.
+   * create parrent directory if necessary. Never throw NodeExistException.
+   */
+  def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
+    try {
+      client.writeData(path, data)
+    }
+    catch {
+      case e: ZkNoNodeException => {
+        createParentPath(client, path)
+        try {
+          client.createPersistent(path, data)
+        }
+        catch {
+          case e: ZkNodeExistsException => client.writeData(path, data)
+          case e2 => throw e2
+        }
+      }
+      case e2 => throw e2
+    }
+  }
+
+  /**
+   * Update the value of a persistent node with the given path and data.
+   * create parrent directory if necessary. Never throw NodeExistException.
+   */
+  def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
+    try {
+      client.writeData(path, data)
+    }
+    catch {
+      case e: ZkNoNodeException => {
+        createParentPath(client, path)
+        client.createEphemeral(path, data)
+      }
+      case e2 => throw e2
+    }
+  }
+
+  def deletePath(client: ZkClient, path: String) {
+    try {
+      client.delete(path)
+    }
+    catch {
+      case e: ZkNoNodeException =>
+        // this can happen during a connection loss event, return normally
+        logger.info(path + " deleted during connection loss; this is ok")
+      case e2 => throw e2
+    }
+  }
+
+  def deletePathRecursive(client: ZkClient, path: String) {
+    try {
+      client.deleteRecursive(path)
+    }
+    catch {
+      case e: ZkNoNodeException =>
+        // this can happen during a connection loss event, return normally
+        logger.info(path + " deleted during connection loss; this is ok")
+      case e2 => throw e2
+    }
+  }
+
+  def readData(client: ZkClient, path: String): String = {
+    client.readData(path)
+  }
+
+  def readDataMaybeNull(client: ZkClient, path: String): String = {
+    client.readData(path, true)
+  }
+
+  def getChildren(client: ZkClient, path: String): Seq[String] = {
+    import scala.collection.JavaConversions._
+    // triggers implicit conversion from java list to scala Seq
+    client.getChildren(path)
+  }
+
+  def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = {
+    import scala.collection.JavaConversions._
+    // triggers implicit conversion from java list to scala Seq
+
+    var ret: java.util.List[String] = null
+    try {
+      ret = client.getChildren(path)
+    }
+    catch {
+      case e: ZkNoNodeException =>
+        return Nil
+      case e2 => throw e2
+    }
+    return ret
+  }
+
+  /**
+   * Check if the given path exists
+   */
+  def pathExists(client: ZkClient, path: String): Boolean = {
+    client.exists(path)
+  }
+
+  def getLastPart(path : String) : String = path.substring(path.lastIndexOf('/') + 1)
+
+  def getCluster(zkClient: ZkClient) : Cluster = {
+    val cluster = new Cluster
+    val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
+    for (node <- nodes) {
+      val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)
+      cluster.add(Broker.createBroker(node.toInt, brokerZKString))
+    }
+    cluster
+  }
+
+  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, List[String]] = {
+    val ret = new mutable.HashMap[String, List[String]]()
+    for (topic <- topics) {
+      var partList: List[String] = Nil
+      val brokers = getChildrenParentMayNotExist(zkClient, BrokerTopicsPath + "/" + topic)
+      for (broker <- brokers) {
+        val nParts = readData(zkClient, BrokerTopicsPath + "/" + topic + "/" + broker).toInt
+        for (part <- 0 until nParts)
+          partList ::= broker + "-" + part
+      }
+      partList = partList.sortWith((s,t) => s < t)
+      ret += (topic -> partList)
+    }
+    ret
+  }
+
+  def setupPartition(zkClient : ZkClient, brokerId: Int, host: String, port: Int, topic: String, nParts: Int) {
+    val brokerIdPath = BrokerIdsPath + "/" + brokerId
+    val broker = new Broker(brokerId, brokerId.toString, host, port)
+    createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+    val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
+    createEphemeralPathExpectConflict(zkClient, brokerPartTopicPath, nParts.toString)    
+  }
+
+  def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
+    val brokerIdPath = BrokerIdsPath + "/" + brokerId
+    zkClient.delete(brokerIdPath)
+    val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
+    zkClient.delete(brokerPartTopicPath)
+  }
+}
+
+object StringSerializer extends ZkSerializer {
+
+  @throws(classOf[ZkMarshallingError])
+  def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
+
+  @throws(classOf[ZkMarshallingError])
+  def deserialize(bytes : Array[Byte]) : Object = {
+    if (bytes == null)
+      null
+    else
+      new String(bytes, "UTF-8")
+  }
+}
+
+class ZKGroupDirs(val group: String) {
+  def consumerDir = ZkUtils.ConsumersPath
+  def consumerGroupDir = consumerDir + "/" + group
+  def consumerRegistryDir = consumerGroupDir + "/ids"
+}
+
+class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
+  def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic
+  def consumerOwnerDir = consumerGroupDir + "/owners/" + topic
+}
+
+
+class ZKConfig(props: Properties) {
+  /** ZK host string */
+  val zkConnect = Utils.getString(props, "zk.connect", null)
+
+  /** zookeeper session timeout */
+  val zkSessionTimeoutMs = Utils.getInt(props, "zk.sessiontimeout.ms", 6000)
+
+  /** the max time that the client waits to establish a connection to zookeeper */
+  val zkConnectionTimeoutMs = Utils.getInt(props, "zk.connectiontimeout.ms",zkSessionTimeoutMs)
+
+  /** how far a ZK follower can be behind a ZK leader */
+  val zkSyncTimeMs = Utils.getInt(props, "zk.synctime.ms", 2000)
+}