You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/03/07 03:09:00 UTC

git commit: KAFKA-1281 add the new producer to existing tools; reviewed by Jun Rao and Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk c3520fe7e -> 74c54c7ee


KAFKA-1281 add the new producer to existing tools; reviewed by Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/74c54c7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/74c54c7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/74c54c7e

Branch: refs/heads/trunk
Commit: 74c54c7eeb236cbf66710751165ea9f632cf3f52
Parents: c3520fe
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Mar 6 18:08:55 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Mar 6 18:08:55 2014 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/ProducerConfig.java  |   7 -
 config/tools-log4j.properties                   |   2 -
 .../scala/kafka/producer/ConsoleProducer.scala  | 334 ++++++++++++-------
 .../kafka/producer/KafkaLog4jAppender.scala     |  36 +-
 .../scala/kafka/tools/ReplayLogProducer.scala   |  55 +--
 .../scala/kafka/tools/TestEndToEndLatency.scala |  72 ++++
 .../scala/kafka/tools/TestLogCleaning.scala     | 309 +++++++++++++++++
 .../scala/kafka/utils/CommandLineUtils.scala    |  15 +-
 .../scala/other/kafka/TestEndToEndLatency.scala |  71 ----
 .../scala/other/kafka/TestLogCleaning.scala     | 315 -----------------
 10 files changed, 623 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f874896..32e12ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -90,12 +90,6 @@ public class ProducerConfig extends AbstractConfig {
     public static final String LINGER_MS_CONFIG = "linger.ms";
 
     /**
-     * Force a refresh of the cluster metadata after this period of time. This ensures that changes to the number of
-     * partitions or other settings will by taken up by producers without restart.
-     */
-    public static final String METADATA_REFRESH_MS_CONFIG = "topic.metadata.refresh.interval.ms";
-
-    /**
      * The id string to pass to the server when making requests. The purpose of this is to be able to track the source
      * of requests beyond just ip/port by allowing a logical application name to be included.
      */
@@ -158,7 +152,6 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah")
                                 .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah")
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah")
-                                .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
                                 .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah")

http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/config/tools-log4j.properties
----------------------------------------------------------------------
diff --git a/config/tools-log4j.properties b/config/tools-log4j.properties
index 7924049..52f07c9 100644
--- a/config/tools-log4j.properties
+++ b/config/tools-log4j.properties
@@ -18,5 +18,3 @@ log4j.rootLogger=WARN, stdout
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 28de573..dd39ff2 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -17,95 +17,152 @@
 
 package kafka.producer
 
-import scala.collection.JavaConversions._
 import joptsimple._
 import java.util.Properties
 import java.io._
 import kafka.common._
 import kafka.message._
 import kafka.serializer._
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import kafka.utils.{CommandLineUtils, Utils}
 
 object ConsoleProducer { 
 
   def main(args: Array[String]) { 
+
+    val config = new ProducerConfig(args)
+    val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
+    reader.init(System.in, config.cmdLineProps)
+
+    try {
+        val producer =
+          if(config.useNewProducer) new NewShinyProducer(config)
+          else new OldProducer(config)
+
+        Runtime.getRuntime.addShutdownHook(new Thread() {
+          override def run() {
+            producer.close()
+          }
+        })
+
+        var message: KeyedMessage[Array[Byte], Array[Byte]] = null
+        do {
+          message = reader.readMessage()
+          if(message != null)
+            producer.send(message.topic, message.key, message.message)
+        } while(message != null)
+    } catch {
+      case e: Exception =>
+        e.printStackTrace
+        System.exit(1)
+    }
+    System.exit(0)
+  }
+
+  class ProducerConfig(args: Array[String]) {
     val parser = new OptionParser
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
-                           .withRequiredArg
-                           .describedAs("topic")
-                           .ofType(classOf[String])
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
     val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
-                           .withRequiredArg
-                           .describedAs("broker-list")
-                           .ofType(classOf[String])
+      .withRequiredArg
+      .describedAs("broker-list")
+      .ofType(classOf[String])
     val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
     val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
     val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
-                             .withRequiredArg
-                             .describedAs("size")
-                             .ofType(classOf[java.lang.Integer])
-                             .defaultsTo(200)
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(200)
     val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.")
-                             .withRequiredArg
-                             .ofType(classOf[java.lang.Integer])
-                             .defaultsTo(3)
+      .withRequiredArg
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3)
     val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
-                             .withRequiredArg
-                             .ofType(classOf[java.lang.Long])
-                             .defaultsTo(100)
-    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + 
-                                                   " a message will queue awaiting suffient batch size. The value is given in ms.")
-                               .withRequiredArg
-                               .describedAs("timeout_ms")
-                               .ofType(classOf[java.lang.Long])
-                               .defaultsTo(1000)
-    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + 
-                                                   " messages will queue awaiting suffient batch size.")
-                               .withRequiredArg
-                               .describedAs("queue_size")
-                               .ofType(classOf[java.lang.Long])
-                               .defaultsTo(10000)
+      .withRequiredArg
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(100)
+    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+      " a message will queue awaiting suffient batch size. The value is given in ms.")
+      .withRequiredArg
+      .describedAs("timeout_ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(1000)
+    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
+      " messages will queue awaiting suffient batch size.")
+      .withRequiredArg
+      .describedAs("queue_size")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(10000)
     val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
-                               .withRequiredArg
-                               .describedAs("queue enqueuetimeout ms")
-                               .ofType(classOf[java.lang.Long])
-                               .defaultsTo(Int.MaxValue)
+      .withRequiredArg
+      .describedAs("queue enqueuetimeout ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(Int.MaxValue)
     val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
-                               .withRequiredArg
-                               .describedAs("request required acks")
-                               .ofType(classOf[java.lang.Integer])
-                               .defaultsTo(0)
+      .withRequiredArg
+      .describedAs("request required acks")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
     val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
-                               .withRequiredArg
-                               .describedAs("request timeout ms")
-                               .ofType(classOf[java.lang.Integer])
-                               .defaultsTo(1500)
+      .withRequiredArg
+      .describedAs("request timeout ms")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1500)
+    val metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+      "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes.")
+      .withRequiredArg
+      .describedAs("metadata expiration interval")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(5*60*1000L)
+    val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms",
+      "The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.")
+      .withRequiredArg
+      .describedAs("metadata fetch timeout")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(60*1000L)
+    val maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+      "The total memory used by the producer to buffer records waiting to be sent to the server.")
+      .withRequiredArg
+      .describedAs("total memory in bytes")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(32 * 1024 * 1024L)
+    val maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+      "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+        "will attempt to optimistically group them together until this size is reached.")
+      .withRequiredArg
+      .describedAs("memory in bytes per partition")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(16 * 1024L)
     val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
-                                 .withRequiredArg
-                                 .describedAs("encoder_class")
-                                 .ofType(classOf[java.lang.String])
-                                 .defaultsTo(classOf[StringEncoder].getName)
+      .withRequiredArg
+      .describedAs("encoder_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[DefaultEncoder].getName)
     val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
-                                 .withRequiredArg
-                                 .describedAs("encoder_class")
-                                 .ofType(classOf[java.lang.String])
-                                 .defaultsTo(classOf[StringEncoder].getName)
-    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + 
-                                                          "By default each line is read as a separate message.")
-                                  .withRequiredArg
-                                  .describedAs("reader_class")
-                                  .ofType(classOf[java.lang.String])
-                                  .defaultsTo(classOf[LineMessageReader].getName)
+      .withRequiredArg
+      .describedAs("encoder_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[DefaultEncoder].getName)
+    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+      "By default each line is read as a separate message.")
+      .withRequiredArg
+      .describedAs("reader_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[LineMessageReader].getName)
     val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
-                                  .withRequiredArg
-                                  .describedAs("size")
-                                  .ofType(classOf[java.lang.Integer])
-                                  .defaultsTo(1024*100)
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1024*100)
     val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
-                                                 "This allows custom configuration for a user-defined message reader.")
-                            .withRequiredArg
-                            .describedAs("prop")
-                            .ofType(classOf[String])
-                            
+      "This allows custom configuration for a user-defined message reader.")
+      .withRequiredArg
+      .describedAs("prop")
+      .ofType(classOf[String])
+    val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
 
     val options = parser.parse(args : _*)
     for(arg <- List(topicOpt, brokerListOpt)) {
@@ -116,6 +173,8 @@ object ConsoleProducer {
       }
     }
 
+    import scala.collection.JavaConversions._
+    val useNewProducer = options.has(useNewProducerOpt)
     val topic = options.valueOf(topicOpt)
     val brokerList = options.valueOf(brokerListOpt)
     val sync = options.has(syncOpt)
@@ -126,76 +185,28 @@ object ConsoleProducer {
     val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
     val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
     val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
+    val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt)
+    val retryBackoffMs = options.valueOf(retryBackoffMsOpt)
     val keyEncoderClass = options.valueOf(keyEncoderOpt)
     val valueEncoderClass = options.valueOf(valueEncoderOpt)
     val readerClass = options.valueOf(messageReaderOpt)
     val socketBuffer = options.valueOf(socketBufferSizeOpt)
-    val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
+    val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
     cmdLineProps.put("topic", topic)
-
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
-    props.put("compression.codec", codec.toString)
-    props.put("producer.type", if(sync) "sync" else "async")
-    if(options.has(batchSizeOpt))
-      props.put("batch.num.messages", batchSize.toString)
-    
-    props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString)
-    props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString)
-    props.put("queue.buffering.max.ms", sendTimeout.toString)
-    props.put("queue.buffering.max.messages", queueSize.toString)
-    props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
-    props.put("request.required.acks", requestRequiredAcks.toString)
-    props.put("request.timeout.ms", requestTimeoutMs.toString)
-    props.put("key.serializer.class", keyEncoderClass)
-    props.put("serializer.class", valueEncoderClass)
-    props.put("send.buffer.bytes", socketBuffer.toString)
-    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
-    reader.init(System.in, cmdLineProps)
-
-    try {
-        val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
-
-        Runtime.getRuntime.addShutdownHook(new Thread() {
-          override def run() {
-            producer.close()
-          }
-        })
-
-        var message: KeyedMessage[AnyRef, AnyRef] = null
-        do {
-          message = reader.readMessage()
-          if(message != null)
-            producer.send(message)
-        } while(message != null)
-    } catch {
-      case e: Exception =>
-        e.printStackTrace
-        System.exit(1)
-    }
-    System.exit(0)
-  }
-
-  def parseLineReaderArgs(args: Iterable[String]): Properties = {
-    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
-    if(!splits.forall(_.length == 2)) {
-      System.err.println("Invalid line reader properties: " + args.mkString(" "))
-      System.exit(1)
-    }
-    val props = new Properties
-    for(a <- splits)
-      props.put(a(0), a(1))
-    props
+    /* new producer related configs */
+    val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
+    val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
+    val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt)
+    val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt)
   }
 
-  trait MessageReader[K,V] { 
+  trait MessageReader {
     def init(inputStream: InputStream, props: Properties) {}
-    def readMessage(): KeyedMessage[K,V]
+    def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
     def close() {}
   }
 
-  class LineMessageReader extends MessageReader[String, String] {
+  class LineMessageReader extends MessageReader {
     var topic: String = null
     var reader: BufferedReader = null
     var parseKey = false
@@ -222,17 +233,84 @@ object ConsoleProducer {
           line.indexOf(keySeparator) match {
             case -1 =>
               if(ignoreError)
-                new KeyedMessage(topic, line)
+                new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
               else
                 throw new KafkaException("No key found on line " + lineNumber + ": " + line)
             case n =>
-              new KeyedMessage(topic,
-                             line.substring(0, n), 
-                             if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size))
+              new KeyedMessage[Array[Byte], Array[Byte]](topic,
+                             line.substring(0, n).getBytes,
+                             (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes())
           }
         case (line, false) =>
-          new KeyedMessage(topic, line)
+          new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
       }
     }
   }
+
+  trait Producer {
+    def send(topic: String, key: Array[Byte], bytes: Array[Byte])
+    def close()
+  }
+
+  class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
+    val props = new Properties()
+    props.put("metadata.broker.list", producerConfig.brokerList)
+    val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
+    props.put("compression.codec", codec.toString)
+    props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
+    props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString)
+    props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString)
+    props.put("metadata.fetch.timeout.ms", producerConfig.metadataFetchTimeoutMs.toString)
+    props.put("request.required.acks", producerConfig.requestRequiredAcks.toString)
+    props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString)
+    props.put("request.retries", producerConfig.messageSendMaxRetries.toString)
+    props.put("linger.ms", producerConfig.sendTimeout.toString)
+    if(producerConfig.queueEnqueueTimeoutMs != -1)
+      props.put("block.on.buffer.full", "false")
+    props.put("total.memory.bytes", producerConfig.maxMemoryBytes.toString)
+    props.put("max.partition.bytes", producerConfig.maxPartitionMemoryBytes.toString)
+    props.put("client.id", "console-producer")
+    val producer = new KafkaProducer(props)
+
+    def send(topic: String, key: Array[Byte], bytes: Array[Byte]) {
+      val response = this.producer.send(new ProducerRecord(topic, key, bytes))
+      if(producerConfig.sync) {
+        response.get()
+      }
+    }
+
+    def close() {
+      this.producer.close()
+    }
+  }
+
+  class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer {
+    val props = new Properties()
+    props.put("metadata.broker.list", producerConfig.brokerList)
+    val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
+    props.put("compression.codec", codec.toString)
+    props.put("producer.type", if(producerConfig.sync) "sync" else "async")
+    props.put("batch.num.messages", producerConfig.batchSize.toString)
+    props.put("message.send.max.retries", producerConfig.messageSendMaxRetries.toString)
+    props.put("retry.backoff.ms", producerConfig.retryBackoffMs.toString)
+    props.put("queue.buffering.max.ms", producerConfig.sendTimeout.toString)
+    props.put("queue.buffering.max.messages", producerConfig.queueSize.toString)
+    props.put("queue.enqueue.timeout.ms", producerConfig.queueEnqueueTimeoutMs.toString)
+    props.put("request.required.acks", producerConfig.requestRequiredAcks.toString)
+    props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString)
+    props.put("key.serializer.class", producerConfig.keyEncoderClass)
+    props.put("serializer.class", producerConfig.valueEncoderClass)
+    props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
+    props.put("topic.metadata.refresh.interval.ms", producerConfig.metadataExpiryMs.toString)
+    props.put("client.id", "console-producer")
+    val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new kafka.producer.ProducerConfig(props))
+
+    def send(topic: String, key: Array[Byte], bytes: Array[Byte]) {
+      this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, bytes))
+    }
+
+    def close() {
+      this.producer.close()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index 42239b2..0067a53 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -23,18 +23,15 @@ import org.apache.log4j.AppenderSkeleton
 import org.apache.log4j.helpers.LogLog
 import kafka.utils.Logging
 import java.util.{Properties, Date}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   var topic:String = null
-  var serializerClass:String = null
   var brokerList:String = null
-  var producerType:String = null
   var compressionCodec:String = null
-  var enqueueTimeout:String = null
-  var queueSize:String = null
   var requiredNumAcks: Int = Int.MaxValue
 
-  private var producer: Producer[String, String] = null
+  private var producer: KafkaProducer = null
 
   def getTopic:String = topic
   def setTopic(topic: String) { this.topic = topic }
@@ -42,21 +39,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   def getBrokerList:String = brokerList
   def setBrokerList(brokerList: String) { this.brokerList = brokerList }
 
-  def getSerializerClass:String = serializerClass
-  def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
-
-  def getProducerType:String = producerType
-  def setProducerType(producerType:String) { this.producerType = producerType }
-
   def getCompressionCodec:String = compressionCodec
   def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
 
-  def getEnqueueTimeout:String = enqueueTimeout
-  def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout }
-
-  def getQueueSize:String = queueSize
-  def setQueueSize(queueSize:String) { this.queueSize = queueSize }
-
   def getRequiredNumAcks:Int = requiredNumAcks
   def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks }
 
@@ -69,28 +54,17 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
       throw new MissingConfigException("The metadata.broker.list property should be specified")
     if(topic == null)
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
-    if(serializerClass == null) {
-      serializerClass = "kafka.serializer.StringEncoder"
-      LogLog.debug("Using default encoder - kafka.serializer.StringEncoder")
-    }
-    props.put("serializer.class", serializerClass)
-    //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified
-    if(producerType != null) props.put("producer.type", producerType)
     if(compressionCodec != null) props.put("compression.codec", compressionCodec)
-    if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout)
-    if(queueSize != null) props.put("queue.buffering.max.messages", queueSize)
     if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString)
-    val config : ProducerConfig = new ProducerConfig(props)
-    producer = new Producer[String, String](config)
-    LogLog.debug("Kafka producer connected to " +  config.brokerList)
+    producer = new KafkaProducer(props)
+    LogLog.debug("Kafka producer connected to " +  brokerList)
     LogLog.debug("Logging for topic: " + topic)
   }
 
   override def append(event: LoggingEvent)  {
     val message = subAppend(event)
     LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
-    val messageData = new KeyedMessage[String, String](topic, message)
-    producer.send(messageData);
+    producer.send(new ProducerRecord(topic, message.getBytes()));
   }
 
   def subAppend(event: LoggingEvent): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 814d61a..f2246f9 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -20,11 +20,10 @@ package kafka.tools
 import joptsimple.OptionParser
 import java.util.concurrent.{Executors, CountDownLatch}
 import java.util.Properties
-import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import kafka.consumer._
-import kafka.utils.{Logging, ZkUtils}
+import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
 import kafka.api.OffsetRequest
-import kafka.message.CompressionCodec
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
 
 object ReplayLogProducer extends Logging {
 
@@ -88,17 +87,6 @@ object ReplayLogProducer extends Logging {
       .describedAs("count")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(-1)
-    val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
-    val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2 batch sends.")
-      .withRequiredArg
-      .describedAs("ms")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(0)
-    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
-      .withRequiredArg
-      .describedAs("batch size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(200)
     val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
       .withRequiredArg
       .describedAs("threads")
@@ -109,11 +97,12 @@ object ReplayLogProducer extends Logging {
       .describedAs("size")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(5000)
-    val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
+    val propertyOpt = parser.accepts("property", "A mechanism to pass properties in the form key=value to the producer. " +
+      "This allows the user to override producer properties that are not exposed by the existing command line arguments")
       .withRequiredArg
-      .describedAs("compression codec ")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
+      .describedAs("producer properties")
+      .ofType(classOf[String])
+    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
 
     val options = parser.parse(args : _*)
     for(arg <- List(brokerListOpt, inputTopicOpt)) {
@@ -126,31 +115,19 @@ object ReplayLogProducer extends Logging {
     val zkConnect = options.valueOf(zkConnectOpt)
     val brokerList = options.valueOf(brokerListOpt)
     val numMessages = options.valueOf(numMessagesOpt).intValue
-    val isAsync = options.has(asyncOpt)
-    val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
-    var batchSize = options.valueOf(batchSizeOpt).intValue
     val numThreads = options.valueOf(numThreadsOpt).intValue
     val inputTopic = options.valueOf(inputTopicOpt)
     val outputTopic = options.valueOf(outputTopicOpt)
     val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
-    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+    val isSync = options.has(syncOpt)
+    import scala.collection.JavaConversions._
+    val producerProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
+    producerProps.put("metadata.broker.list", brokerList)
   }
 
   class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
-    val props = new Properties()
-    props.put("metadata.broker.list", config.brokerList)
-    props.put("reconnect.interval", Integer.MAX_VALUE.toString)
-    props.put("send.buffer.bytes", (64*1024).toString)
-    props.put("compression.codec", config.compressionCodec.codec.toString)
-    props.put("batch.num.messages", config.batchSize.toString)
-    props.put("queue.enqueue.timeout.ms", "-1")
-    
-    if(config.isAsync)
-      props.put("producer.type", "async")
-
-    val producerConfig = new ProducerConfig(props)
-    val producer = new Producer[Array[Byte], Array[Byte]](producerConfig)
+    val producer = new KafkaProducer(config.producerProps)
 
     override def run() {
       info("Starting consumer thread..")
@@ -163,9 +140,11 @@ object ReplayLogProducer extends Logging {
             stream
         for (messageAndMetadata <- iter) {
           try {
-            producer.send(new KeyedMessage[Array[Byte], Array[Byte]](config.outputTopic, messageAndMetadata.message))
-            if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
-              Thread.sleep(config.delayedMSBtwSend)
+            val response = producer.send(new ProducerRecord(config.outputTopic,
+                                            messageAndMetadata.key(), messageAndMetadata.message()))
+            if(config.isSync) {
+              response.get()
+            }
             messageCount += 1
           }catch {
             case ie: Exception => error("Skipping this message", ie)

http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
new file mode 100644
index 0000000..ea856c7
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.util.Properties
+import kafka.consumer._
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+
+object TestEndToEndLatency {
+  def main(args: Array[String]) {
+    if (args.length != 4) {
+      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages")
+      System.exit(1)
+    }
+
+    val brokerList = args(0)
+    val zkConnect = args(1)
+    val topic = args(2)
+    val numMessages = args(3).toInt
+
+    val consumerProps = new Properties()
+    consumerProps.put("group.id", topic)
+    consumerProps.put("auto.commit.enable", "true")
+    consumerProps.put("auto.offset.reset", "largest")
+    consumerProps.put("zookeeper.connect", zkConnect)
+    consumerProps.put("socket.timeout.ms", 1201000.toString)
+
+    val config = new ConsumerConfig(consumerProps)
+    val connector = Consumer.create(config)
+    var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
+    val iter = stream.iterator
+
+    val producerProps = new Properties()
+    producerProps.put("metadata.broker.list", brokerList)
+    producerProps.put("linger.ms", "0")
+    producerProps.put("block.on.buffer.full", "true")
+    val producer = new KafkaProducer(producerProps)
+
+    val message = "hello there beautiful".getBytes
+    var totalTime = 0.0
+    for (i <- 0 until numMessages) {
+      var begin = System.nanoTime
+      val response = producer.send(new ProducerRecord(topic, message))
+      response.get()
+      val received = iter.next
+      val elapsed = System.nanoTime - begin
+      // poor man's progress bar
+      if (i % 1000 == 0)
+        println(i + "\t" + elapsed / 1000.0 / 1000.0)
+      totalTime += elapsed
+    }
+    println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms")
+    producer.close()
+    connector.shutdown()
+    System.exit(0)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/core/src/main/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
new file mode 100644
index 0000000..edb6e5f
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple.OptionParser
+import java.util.Properties
+import java.util.Random
+import java.io._
+import kafka.consumer._
+import kafka.serializer._
+import kafka.utils._
+import kafka.log.FileMessageSet
+import kafka.log.Log
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+
+/**
+ * This is a torture test that runs against an existing broker. Here is how it works:
+ * 
+ * It produces a series of specially formatted messages to one or more partitions. Each message it produces
+ * it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space.
+ * 
+ * The broker will clean its log as the test runs.
+ * 
+ * When the specified number of messages have been produced we create a consumer and consume all the messages in the topic
+ * and write that out to another text file.
+ * 
+ * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key. 
+ * Then we compare the final message in both logs for each key. If this final message is not the same for all keys we
+ * print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0.
+ */
+object TestLogCleaning {
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.")
+                               .withRequiredArg
+                               .describedAs("count")
+                               .ofType(classOf[java.lang.Long])
+                               .defaultsTo(Long.MaxValue)
+    val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.")
+                           .withRequiredArg
+                           .describedAs("count")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(5)
+    val brokerOpt = parser.accepts("broker", "Url to connect to.")
+                          .withRequiredArg
+                          .describedAs("url")
+                          .ofType(classOf[String])
+    val topicsOpt = parser.accepts("topics", "The number of topics to test.")
+                          .withRequiredArg
+                          .describedAs("count")
+                          .ofType(classOf[java.lang.Integer])
+                          .defaultsTo(1)
+    val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.")
+    		                      .withRequiredArg
+    		                      .describedAs("percent")
+    		                      .ofType(classOf[java.lang.Integer])
+    		                      .defaultsTo(0)
+    val zkConnectOpt = parser.accepts("zk", "Zk url.")
+                             .withRequiredArg
+                             .describedAs("url")
+                             .ofType(classOf[String])
+    val sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.")
+                             .withRequiredArg
+                             .describedAs("ms")
+                             .ofType(classOf[java.lang.Integer])
+                             .defaultsTo(0)
+    val dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.")
+                        .withRequiredArg
+                        .describedAs("directory")
+                        .ofType(classOf[String])
+    
+    val options = parser.parse(args:_*)
+    
+    if(options.has(dumpOpt)) {
+      dumpLog(new File(options.valueOf(dumpOpt)))
+      System.exit(0)
+    }
+    
+    if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) {
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    
+    // parse options
+    val messages = options.valueOf(numMessagesOpt).longValue
+    val percentDeletes = options.valueOf(percentDeletesOpt).intValue
+    val dups = options.valueOf(numDupsOpt).intValue
+    val brokerUrl = options.valueOf(brokerOpt)
+    val topicCount = options.valueOf(topicsOpt).intValue
+    val zkUrl = options.valueOf(zkConnectOpt)
+    val sleepSecs = options.valueOf(sleepSecsOpt).intValue
+    
+    val testId = new Random().nextInt(Int.MaxValue)
+    val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
+    
+    println("Producing %d messages...".format(messages))
+    val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes)
+    println("Sleeping for %d seconds...".format(sleepSecs))
+    Thread.sleep(sleepSecs * 1000)
+    println("Consuming messages...")
+    val consumedDataFile = consumeMessages(zkUrl, topics)
+    
+    val producedLines = lineCount(producedDataFile)
+    val consumedLines = lineCount(consumedDataFile)
+    val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
+    println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
+    
+    println("De-duplicating and validating output files...")
+    validateOutput(producedDataFile, consumedDataFile)
+    producedDataFile.delete()
+    consumedDataFile.delete()
+  }
+  
+  def dumpLog(dir: File) {
+    require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
+    for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
+      val ms = new FileMessageSet(new File(dir, file))
+      for(entry <- ms) {
+        val key = Utils.readString(entry.message.key)
+        val content = 
+          if(entry.message.isNull)
+            null
+          else
+            Utils.readString(entry.message.payload)
+        println("offset = %s, key = %s, content = %s".format(entry.offset, key, content))
+      }
+    }
+  }
+  
+  def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
+  
+  def validateOutput(producedDataFile: File, consumedDataFile: File) {
+    val producedReader = externalSort(producedDataFile)
+    val consumedReader = externalSort(consumedDataFile)
+    val produced = valuesIterator(producedReader)
+    val consumed = valuesIterator(consumedReader)
+    val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped")
+    val producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 1024*1024)
+    val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped")
+    val consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 1024*1024)
+    var total = 0
+    var mismatched = 0
+    while(produced.hasNext && consumed.hasNext) {
+      val p = produced.next()
+      producedDeduped.write(p.toString)
+      producedDeduped.newLine()
+      val c = consumed.next()
+      consumedDeduped.write(c.toString)
+      consumedDeduped.newLine()
+      if(p != c)
+        mismatched += 1
+      total += 1
+    }
+    producedDeduped.close()
+    consumedDeduped.close()
+    require(!produced.hasNext, "Additional values produced not found in consumer log.")
+    require(!consumed.hasNext, "Additional values consumed not found in producer log.")
+    println("Validated " + total + " values, " + mismatched + " mismatches.")
+    require(mismatched == 0, "Non-zero number of row mismatches.")
+    // if all the checks worked out we can delete the deduped files
+    producedDedupedFile.delete()
+    consumedDedupedFile.delete()
+  }
+  
+  def valuesIterator(reader: BufferedReader) = {
+    new IteratorTemplate[TestRecord] {
+      def makeNext(): TestRecord = {
+        var next = readNext(reader)
+        while(next != null && next.delete)
+          next = readNext(reader)
+        if(next == null)
+          allDone()
+        else
+          next
+      }
+    }
+  }
+  
+  def readNext(reader: BufferedReader): TestRecord = {
+    var line = reader.readLine()
+    if(line == null)
+      return null
+    var curr = new TestRecord(line)
+    while(true) {
+      line = peekLine(reader)
+      if(line == null)
+        return curr
+      val next = new TestRecord(line)
+      if(next == null || next.topicAndKey != curr.topicAndKey)
+        return curr
+      curr = next
+      reader.readLine()
+    }
+    null
+  }
+  
+  def peekLine(reader: BufferedReader) = {
+    reader.mark(4096)
+    val line = reader.readLine
+    reader.reset()
+    line
+  }
+  
+  def externalSort(file: File): BufferedReader = {
+    val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath)
+    val process = builder.start()
+    new Thread() {
+      override def run() {
+        val exitCode = process.waitFor()
+        if(exitCode != 0) {
+          System.err.println("Process exited abnormally.")
+          while(process.getErrorStream.available > 0) {
+            System.err.write(process.getErrorStream().read())
+          }
+        }
+      }
+    }.start()
+    new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024)
+  }
+  
+  def produceMessages(brokerUrl: String, 
+                      topics: Array[String], 
+                      messages: Long, 
+                      dups: Int,
+                      percentDeletes: Int): File = {
+    val producerProps = new Properties
+    producerProps.setProperty("block.on.buffer.full", "true")
+    producerProps.setProperty("metadata.broker.list", brokerUrl)
+    val producer = new KafkaProducer(producerProps)
+    val rand = new Random(1)
+    val keyCount = (messages / dups).toInt
+    val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
+    println("Logging produce requests to " + producedFile.getAbsolutePath)
+    val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024)
+    for(i <- 0L until (messages * topics.length)) {
+      val topic = topics((i % topics.length).toInt)
+      val key = rand.nextInt(keyCount)
+      val delete = i % 100 < percentDeletes
+      val msg = 
+        if(delete)
+          new ProducerRecord(topic, key.toString.getBytes(), null)
+        else
+          new ProducerRecord(topic, key.toString.getBytes(), i.toString.getBytes())
+      producer.send(msg)
+      producedWriter.write(TestRecord(topic, key, i, delete).toString)
+      producedWriter.newLine()
+    }
+    producedWriter.close()
+    producer.close()
+    producedFile
+  }
+  
+  def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
+    val consumerProps = new Properties
+    consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
+    consumerProps.setProperty("zookeeper.connect", zkUrl)
+    consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString)
+    consumerProps.setProperty("auto.offset.reset", "smallest")
+    new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
+  }
+  
+  def consumeMessages(zkUrl: String, topics: Array[String]): File = {
+    val connector = makeConsumer(zkUrl, topics)
+    val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
+    val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt")
+    println("Logging consumed messages to " + consumedFile.getAbsolutePath)
+    val consumedWriter = new BufferedWriter(new FileWriter(consumedFile))
+    for(topic <- topics) {
+      val stream = streams(topic).head
+      try {
+        for(item <- stream) {
+          val delete = item.message == null
+          val value = if(delete) -1L else item.message.toLong
+          consumedWriter.write(TestRecord(topic, item.key.toInt, value, delete).toString)
+          consumedWriter.newLine()
+        }
+      } catch {
+        case e: ConsumerTimeoutException => 
+      }
+    }
+    consumedWriter.close()
+    connector.shutdown()
+    consumedFile
+  }
+  
+}
+
+case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) {
+  def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d")
+  def this(line: String) = this(line.split("\t"))
+  override def toString() = topic + "\t" +  key + "\t" + value + "\t" + (if(delete) "d" else "u")
+  def topicAndKey = topic + key
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/core/src/main/scala/kafka/utils/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 726c302..c1d8ba5 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -18,6 +18,7 @@
 
 import joptsimple.{OptionSpec, OptionSet, OptionParser}
 import scala.collection.Set
+ import java.util.Properties
 
  /**
  * Helper functions for dealing with command line utilities
@@ -45,4 +46,16 @@ object CommandLineUtils extends Logging {
       }
     }
   }
-}
\ No newline at end of file
+
+   def parseCommandLineArgs(args: Iterable[String]): Properties = {
+     val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
+     if(!splits.forall(_.length == 2)) {
+       System.err.println("Invalid command line properties: " + args.mkString(" "))
+       System.exit(1)
+     }
+     val props = new Properties
+     for(a <- splits)
+       props.put(a(0), a(1))
+     props
+   }
+ }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
deleted file mode 100644
index f5d39dd..0000000
--- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka
-
-import java.util.Properties
-import kafka.consumer._
-import kafka.producer._
-import kafka.message._
-
-object TestEndToEndLatency {
-  def main(args: Array[String]) {
-    if (args.length != 4) {
-      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages")
-      System.exit(1)
-    }
-
-    val brokerList = args(0)
-    val zkConnect = args(1)
-    val topic = args(2)
-    val numMessages = args(3).toInt
-
-    val consumerProps = new Properties()
-    consumerProps.put("group.id", topic)
-    consumerProps.put("auto.commit", "true")
-    consumerProps.put("auto.offset.reset", "largest")
-    consumerProps.put("zookeeper.connect", zkConnect)
-    consumerProps.put("socket.timeout.ms", 1201000.toString)
-
-    val config = new ConsumerConfig(consumerProps)
-    val connector = Consumer.create(config)
-    var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
-    val iter = stream.iterator
-
-    val producerProps = new Properties()
-    producerProps.put("metadata.broker.list", brokerList)
-    producerProps.put("producer.type", "sync")
-    val producer = new Producer[Any, Any](new ProducerConfig(producerProps))
-
-    val message = "hello there beautiful".getBytes
-    var totalTime = 0.0
-    for (i <- 0 until numMessages) {
-      var begin = System.nanoTime
-      producer.send(new KeyedMessage(topic, message))
-      val received = iter.next
-      val elapsed = System.nanoTime - begin
-      // poor man's progress bar
-      if (i % 1000 == 0)
-        println(i + "\t" + elapsed / 1000.0 / 1000.0)
-      totalTime += elapsed
-    }
-    println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms")
-    producer.close()
-    connector.shutdown()
-    System.exit(0)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/74c54c7e/core/src/test/scala/other/kafka/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala
deleted file mode 100644
index d20d132..0000000
--- a/core/src/test/scala/other/kafka/TestLogCleaning.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka
-
-import joptsimple.OptionParser
-import java.util.Properties
-import java.util.Random
-import java.io._
-import scala.io.Source
-import scala.io.BufferedSource
-import kafka.producer._
-import kafka.consumer._
-import kafka.serializer._
-import kafka.utils._
-import kafka.log.FileMessageSet
-import kafka.log.Log
-
-/**
- * This is a torture test that runs against an existing broker. Here is how it works:
- * 
- * It produces a series of specially formatted messages to one or more partitions. Each message it produces
- * it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space.
- * 
- * The broker will clean its log as the test runs.
- * 
- * When the specified number of messages have been produced we create a consumer and consume all the messages in the topic
- * and write that out to another text file.
- * 
- * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key. 
- * Then we compare the final message in both logs for each key. If this final message is not the same for all keys we
- * print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0.
- */
-object TestLogCleaning {
-
-  def main(args: Array[String]) {
-    val parser = new OptionParser
-    val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.")
-                               .withRequiredArg
-                               .describedAs("count")
-                               .ofType(classOf[java.lang.Long])
-                               .defaultsTo(Long.MaxValue)
-    val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.")
-                           .withRequiredArg
-                           .describedAs("count")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(5)
-    val brokerOpt = parser.accepts("broker", "Url to connect to.")
-                          .withRequiredArg
-                          .describedAs("url")
-                          .ofType(classOf[String])
-    val topicsOpt = parser.accepts("topics", "The number of topics to test.")
-                          .withRequiredArg
-                          .describedAs("count")
-                          .ofType(classOf[java.lang.Integer])
-                          .defaultsTo(1)
-    val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.")
-    		                      .withRequiredArg
-    		                      .describedAs("percent")
-    		                      .ofType(classOf[java.lang.Integer])
-    		                      .defaultsTo(0)
-    val zkConnectOpt = parser.accepts("zk", "Zk url.")
-                             .withRequiredArg
-                             .describedAs("url")
-                             .ofType(classOf[String])
-    val sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.")
-                             .withRequiredArg
-                             .describedAs("ms")
-                             .ofType(classOf[java.lang.Integer])
-                             .defaultsTo(0)
-    val dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.")
-                        .withRequiredArg
-                        .describedAs("directory")
-                        .ofType(classOf[String])
-    
-    val options = parser.parse(args:_*)
-    
-    if(options.has(dumpOpt)) {
-      dumpLog(new File(options.valueOf(dumpOpt)))
-      System.exit(0)
-    }
-    
-    if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) {
-      parser.printHelpOn(System.err)
-      System.exit(1)
-    }
-    
-    // parse options
-    val messages = options.valueOf(numMessagesOpt).longValue
-    val percentDeletes = options.valueOf(percentDeletesOpt).intValue
-    val dups = options.valueOf(numDupsOpt).intValue
-    val brokerUrl = options.valueOf(brokerOpt)
-    val topicCount = options.valueOf(topicsOpt).intValue
-    val zkUrl = options.valueOf(zkConnectOpt)
-    val sleepSecs = options.valueOf(sleepSecsOpt).intValue
-    
-    val testId = new Random().nextInt(Int.MaxValue)
-    val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
-    
-    println("Producing %d messages...".format(messages))
-    val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes)
-    println("Sleeping for %d seconds...".format(sleepSecs))
-    Thread.sleep(sleepSecs * 1000)
-    println("Consuming messages...")
-    val consumedDataFile = consumeMessages(zkUrl, topics)
-    
-    val producedLines = lineCount(producedDataFile)
-    val consumedLines = lineCount(consumedDataFile)
-    val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
-    println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
-    
-    println("Deduplicating and validating output files...")
-    validateOutput(producedDataFile, consumedDataFile)
-    producedDataFile.delete()
-    consumedDataFile.delete()
-  }
-  
-  def dumpLog(dir: File) {
-    require(dir.exists, "Non-existant directory: " + dir.getAbsolutePath)
-    for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
-      val ms = new FileMessageSet(new File(dir, file))
-      for(entry <- ms) {
-        val key = Utils.readString(entry.message.key)
-        val content = 
-          if(entry.message.isNull)
-            null
-          else
-            Utils.readString(entry.message.payload)
-        println("offset = %s, key = %s, content = %s".format(entry.offset, key, content))
-      }
-    }
-  }
-  
-  def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
-  
-  def validateOutput(producedDataFile: File, consumedDataFile: File) {
-    val producedReader = externalSort(producedDataFile)
-    val consumedReader = externalSort(consumedDataFile)
-    val produced = valuesIterator(producedReader)
-    val consumed = valuesIterator(consumedReader)
-    val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped")
-    val producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 1024*1024)
-    val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped")
-    val consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 1024*1024)
-    var total = 0
-    var mismatched = 0
-    while(produced.hasNext && consumed.hasNext) {
-      val p = produced.next()
-      producedDeduped.write(p.toString)
-      producedDeduped.newLine()
-      val c = consumed.next()
-      consumedDeduped.write(c.toString)
-      consumedDeduped.newLine()
-      if(p != c)
-        mismatched += 1
-      total += 1
-    }
-    producedDeduped.close()
-    consumedDeduped.close()
-    require(!produced.hasNext, "Additional values produced not found in consumer log.")
-    require(!consumed.hasNext, "Additional values consumed not found in producer log.")
-    println("Validated " + total + " values, " + mismatched + " mismatches.")
-    require(mismatched == 0, "Non-zero number of row mismatches.")
-    // if all the checks worked out we can delete the deduped files
-    producedDedupedFile.delete()
-    consumedDedupedFile.delete()
-  }
-  
-  def valuesIterator(reader: BufferedReader) = {
-    new IteratorTemplate[TestRecord] {
-      def makeNext(): TestRecord = {
-        var next = readNext(reader)
-        while(next != null && next.delete)
-          next = readNext(reader)
-        if(next == null)
-          allDone()
-        else
-          next
-      }
-    }
-  }
-  
-  def readNext(reader: BufferedReader): TestRecord = {
-    var line = reader.readLine()
-    if(line == null)
-      return null
-    var curr = new TestRecord(line)
-    while(true) {
-      line = peekLine(reader)
-      if(line == null)
-        return curr
-      val next = new TestRecord(line)
-      if(next == null || next.topicAndKey != curr.topicAndKey)
-        return curr
-      curr = next
-      reader.readLine()
-    }
-    null
-  }
-  
-  def peekLine(reader: BufferedReader) = {
-    reader.mark(4096)
-    val line = reader.readLine
-    reader.reset()
-    line
-  }
-  
-  def externalSort(file: File): BufferedReader = {
-    val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath)
-    val process = builder.start()
-    new Thread() {
-      override def run() {
-        val exitCode = process.waitFor()
-        if(exitCode != 0) {
-          System.err.println("Process exited abnormally.")
-          while(process.getErrorStream.available > 0) {
-            System.err.write(process.getErrorStream().read())
-          }
-        }
-      }
-    }.start()
-    new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024)
-  }
-  
-  def produceMessages(brokerUrl: String, 
-                      topics: Array[String], 
-                      messages: Long, 
-                      dups: Int,
-                      percentDeletes: Int): File = {
-    val producerProps = new Properties
-    producerProps.setProperty("producer.type", "async")
-    producerProps.setProperty("metadata.broker.list", brokerUrl)
-    producerProps.setProperty("serializer.class", classOf[StringEncoder].getName)
-    producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName)
-    producerProps.setProperty("queue.enqueue.timeout.ms", "-1")
-    producerProps.setProperty("batch.num.messages", 1000.toString)
-    val producer = new Producer[String, String](new ProducerConfig(producerProps))
-    val rand = new Random(1)
-    val keyCount = (messages / dups).toInt
-    val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
-    println("Logging produce requests to " + producedFile.getAbsolutePath)
-    val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024)
-    for(i <- 0L until (messages * topics.length)) {
-      val topic = topics((i % topics.length).toInt)
-      val key = rand.nextInt(keyCount)
-      val delete = i % 100 < percentDeletes
-      val msg = 
-        if(delete)
-          new KeyedMessage[String, String](topic = topic, key = key.toString, message = null)
-        else
-          new KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString)
-      producer.send(msg)
-      producedWriter.write(TestRecord(topic, key, i, delete).toString)
-      producedWriter.newLine()
-    }
-    producedWriter.close()
-    producer.close()
-    producedFile
-  }
-  
-  def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
-    val consumerProps = new Properties
-    consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
-    consumerProps.setProperty("zookeeper.connect", zkUrl)
-    consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString)
-    consumerProps.setProperty("auto.offset.reset", "smallest")
-    new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
-  }
-  
-  def consumeMessages(zkUrl: String, topics: Array[String]): File = {
-    val connector = makeConsumer(zkUrl, topics)
-    val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
-    val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt")
-    println("Logging consumed messages to " + consumedFile.getAbsolutePath)
-    val consumedWriter = new BufferedWriter(new FileWriter(consumedFile))
-    for(topic <- topics) {
-      val stream = streams(topic).head
-      try {
-        for(item <- stream) {
-          val delete = item.message == null
-          val value = if(delete) -1L else item.message.toLong
-          consumedWriter.write(TestRecord(topic, item.key.toInt, value, delete).toString)
-          consumedWriter.newLine()
-        }
-      } catch {
-        case e: ConsumerTimeoutException => 
-      }
-    }
-    consumedWriter.close()
-    connector.shutdown()
-    consumedFile
-  }
-  
-}
-
-case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) {
-  def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d")
-  def this(line: String) = this(line.split("\t"))
-  override def toString() = topic + "\t" +  key + "\t" + value + "\t" + (if(delete) "d" else "u")
-  def topicAndKey = topic + key
-}
\ No newline at end of file