You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/11 05:59:43 UTC

[4/4] git commit: KAFKA-1236 Fix various breakages in the perf tests. Make the producer test use either the old or the new producer.

KAFKA-1236 Fix various breakages in the perf tests. Make the producer test use either the old or the new producer.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b80dbb9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b80dbb9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b80dbb9

Branch: refs/heads/trunk
Commit: 6b80dbb97b3d9b533f60d5c09639ea6c7c5f2be5
Parents: 02bb382
Author: Jay Kreps <ja...@gmail.com>
Authored: Sun Feb 9 15:16:51 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Mon Feb 10 20:49:08 2014 -0800

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |   8 +-
 .../clients/producer/internals/Sender.java      |  27 ++-
 .../clients/tools/ProducerPerformance.java      |  19 +-
 .../scala/other/kafka/TestEndToEndLatency.scala |  22 +-
 .../scala/kafka/perf/ConsumerPerformance.scala  | 105 +++++----
 .../scala/kafka/perf/ProducerPerformance.scala  | 216 +++++++++++--------
 project/Build.scala                             |   2 +-
 7 files changed, 213 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 75a3fc4..e0c31ee 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -50,22 +50,22 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
-for file in $base_dir/clients/build/libs//kafka-clients*.jar;
+for file in $base_dir/examples/build/libs//kafka-examples*.jar;
 do
   CLASSPATH=$CLASSPATH:$file
 done
 
-for file in $base_dir/examples/build/libs//kafka-examples*.jar;
+for file in $base_dir/contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar;
 do
   CLASSPATH=$CLASSPATH:$file
 done
 
-for file in $base_dir/contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar;
+for file in $base_dir/contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar;
 do
   CLASSPATH=$CLASSPATH:$file
 done
 
-for file in $base_dir/contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar;
+for file in $base_dir/clients/target/scala-${SCALA_VERSION}/clients*.jar;
 do
   CLASSPATH=$CLASSPATH:$file
 done

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 3e10e32..e8c194c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer.internals;
 
@@ -152,7 +148,7 @@ public class Sender implements Runnable {
 
         // do the I/O
         try {
-            this.selector.poll(5L, sends);
+            this.selector.poll(100L, sends);
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -189,6 +185,7 @@ public class Sender implements Runnable {
     public void initiateClose() {
         this.running = false;
         this.accumulator.close();
+        this.wakeup();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 108d61e..3ebbb80 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -26,22 +26,25 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.record.Records;
 
-
 public class ProducerPerformance {
 
     public static void main(String[] args) throws Exception {
-        if (args.length != 3) {
-            System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_records record_size");
+        if (args.length != 5) {
+            System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks");
             System.exit(1);
         }
         String url = args[0];
-        int numRecords = Integer.parseInt(args[1]);
-        int recordSize = Integer.parseInt(args[2]);
+        String topicName = args[1];
+        int numRecords = Integer.parseInt(args[2]);
+        int recordSize = Integer.parseInt(args[3]);
+        int acks = Integer.parseInt(args[4]);
         Properties props = new Properties();
-        props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
+        props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, Integer.toString(acks));
         props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
         props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
         props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024));
+        props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024));
 
         KafkaProducer producer = new KafkaProducer(props);
         Callback callback = new Callback() {
@@ -52,7 +55,7 @@ public class ProducerPerformance {
         };
         byte[] payload = new byte[recordSize];
         Arrays.fill(payload, (byte) 1);
-        ProducerRecord record = new ProducerRecord("test", payload);
+        ProducerRecord record = new ProducerRecord(topicName, payload);
         long start = System.currentTimeMillis();
         long maxLatency = -1L;
         long totalLatency = 0;
@@ -75,8 +78,8 @@ public class ProducerPerformance {
         long ellapsed = System.currentTimeMillis() - start;
         double msgsSec = 1000.0 * numRecords / (double) ellapsed;
         double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
-        System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec);
         producer.close();
+        System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
index c4aed10..f5d39dd 100644
--- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
+++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -24,23 +24,23 @@ import kafka.message._
 
 object TestEndToEndLatency {
   def main(args: Array[String]) {
-    if(args.length != 3) {
-      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect num_messages")
+    if (args.length != 4) {
+      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages")
       System.exit(1)
     }
 
     val brokerList = args(0)
     val zkConnect = args(1)
-    val numMessages = args(2).toInt
-    val topic = "test"
-    
+    val topic = args(2)
+    val numMessages = args(3).toInt
+
     val consumerProps = new Properties()
     consumerProps.put("group.id", topic)
     consumerProps.put("auto.commit", "true")
     consumerProps.put("auto.offset.reset", "largest")
     consumerProps.put("zookeeper.connect", zkConnect)
     consumerProps.put("socket.timeout.ms", 1201000.toString)
-    
+
     val config = new ConsumerConfig(consumerProps)
     val connector = Consumer.create(config)
     var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
@@ -50,16 +50,16 @@ object TestEndToEndLatency {
     producerProps.put("metadata.broker.list", brokerList)
     producerProps.put("producer.type", "sync")
     val producer = new Producer[Any, Any](new ProducerConfig(producerProps))
-    
-    val message = new Message("hello there beautiful".getBytes)
+
+    val message = "hello there beautiful".getBytes
     var totalTime = 0.0
-    for(i <- 0 until numMessages) {
+    for (i <- 0 until numMessages) {
       var begin = System.nanoTime
       producer.send(new KeyedMessage(topic, message))
       val received = iter.next
       val elapsed = System.nanoTime - begin
       // poor man's progress bar
-      if(i % 10000 == 0)
+      if (i % 1000 == 0)
         println(i + "\t" + elapsed / 1000.0 / 1000.0)
       totalTime += elapsed
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index 55ee01b..4dde468 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -23,7 +23,7 @@ import java.nio.channels.ClosedByInterruptException
 import org.apache.log4j.Logger
 import kafka.message.Message
 import kafka.utils.ZkUtils
-import java.util.{Random, Properties}
+import java.util.{ Random, Properties }
 import kafka.consumer._
 import java.text.SimpleDateFormat
 
@@ -40,8 +40,8 @@ object ConsumerPerformance {
     var totalMessagesRead = new AtomicLong(0)
     var totalBytesRead = new AtomicLong(0)
 
-    if(!config.hideHeader) {
-      if(!config.showDetailedStats)
+    if (!config.hideHeader) {
+      if (!config.showDetailedStats)
         println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
       else
         println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
@@ -57,7 +57,7 @@ object ConsumerPerformance {
     for ((topic, streamList) <- topicMessageStreams)
       for (i <- 0 until streamList.length)
         threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config,
-                                              totalMessagesRead, totalBytesRead)
+          totalMessagesRead, totalBytesRead)
 
     logger.info("Sleeping for 1 second.")
     Thread.sleep(1000)
@@ -67,61 +67,61 @@ object ConsumerPerformance {
       thread.start
 
     for (thread <- threadList)
-      thread.shutdown
+      thread.join
 
     val endMs = System.currentTimeMillis
     val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0
-    if(!config.showDetailedStats) {
-      val totalMBRead = (totalBytesRead.get*1.0)/(1024*1024)
+    if (!config.showDetailedStats) {
+      val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
       println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
-        config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get,
-        totalMessagesRead.get/elapsedSecs))
+        config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get,
+        totalMessagesRead.get / elapsedSecs))
     }
     System.exit(0)
   }
 
   class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
-                                      "Multiple URLS can be given to allow fail-over.")
-                           .withRequiredArg
-                           .describedAs("urls")
-                           .ofType(classOf[String])
+      "Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
       .withRequiredArg
       .describedAs("topic")
       .ofType(classOf[String])
     val groupIdOpt = parser.accepts("group", "The group id to consume on.")
-                           .withRequiredArg
-                           .describedAs("gid")
-                           .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
-                           .ofType(classOf[String])
+      .withRequiredArg
+      .describedAs("gid")
+      .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
+      .ofType(classOf[String])
     val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
-                           .withRequiredArg
-                           .describedAs("size")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1024 * 1024)
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1024 * 1024)
     val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
       "offset to consume from, start with the latest message present in the log rather than the earliest message.")
     val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
-                           .withRequiredArg
-                           .describedAs("size")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(2 * 1024 * 1024)
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(2 * 1024 * 1024)
     val numThreadsOpt = parser.accepts("threads", "Number of processing threads.")
-                           .withRequiredArg
-                           .describedAs("count")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(10)
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(10)
     val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.")
-                               .withRequiredArg
-                               .describedAs("count")
-                               .ofType(classOf[java.lang.Integer])
-                               .defaultsTo(1)
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
 
-    val options = parser.parse(args : _*)
+    val options = parser.parse(args: _*)
 
-    for(arg <- List(topicOpt, zkConnectOpt)) {
-      if(!options.has(arg)) {
+    for (arg <- List(topicOpt, zkConnectOpt)) {
+      if (!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
         System.exit(1)
@@ -132,7 +132,7 @@ object ConsumerPerformance {
     props.put("group.id", options.valueOf(groupIdOpt))
     props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
     props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
-    props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
+    props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
     props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", "5000")
     props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
@@ -147,13 +147,8 @@ object ConsumerPerformance {
   }
 
   class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
-                           config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
+    config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
     extends Thread(name) {
-    private val shutdownLatch = new CountDownLatch(1)
-
-    def shutdown(): Unit = {
-      shutdownLatch.await
-    }
 
     override def run() {
       var bytesRead = 0L
@@ -164,43 +159,41 @@ object ConsumerPerformance {
       var lastMessagesRead = 0L
 
       try {
-        for (messageAndMetadata <- stream if messagesRead < config.numMessages) {
+        val iter = stream.iterator
+        while (iter.hasNext && messagesRead < config.numMessages) {
+          val messageAndMetadata = iter.next
           messagesRead += 1
           bytesRead += messageAndMetadata.message.length
 
           if (messagesRead % config.reportingInterval == 0) {
-            if(config.showDetailedStats)
+            if (config.showDetailedStats)
               printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis)
             lastReportTime = System.currentTimeMillis
             lastMessagesRead = messagesRead
             lastBytesRead = bytesRead
           }
         }
-      }
-      catch {
+      } catch {
         case _: InterruptedException =>
         case _: ClosedByInterruptException =>
         case _: ConsumerTimeoutException =>
-        case e: Throwable => throw e
+        case e: Throwable => e.printStackTrace()
       }
       totalMessagesRead.addAndGet(messagesRead)
       totalBytesRead.addAndGet(bytesRead)
-      if(config.showDetailedStats)
+      if (config.showDetailedStats)
         printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis)
-      shutdownComplete
     }
 
     private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
-                             startMs: Long, endMs: Long) = {
+      startMs: Long, endMs: Long) = {
       val elapsedMs = endMs - startMs
-      val totalMBRead = (bytesRead*1.0)/(1024*1024)
-      val mbRead = ((bytesRead - lastBytesRead)*1.0)/(1024*1024)
+      val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
+      val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
       println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id,
         config.consumerConfig.fetchMessageMaxBytes, totalMBRead,
-        1000.0*(mbRead/elapsedMs), messagesRead, ((messagesRead - lastMessagesRead)/elapsedMs)*1000.0))
+        1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
     }
-
-    private def shutdownComplete() = shutdownLatch.countDown
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index ad2ac26..5d399d9 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -17,18 +17,19 @@
 
 package kafka.perf
 
-import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.{ CountDownLatch, Executors }
 import java.util.concurrent.atomic.AtomicLong
 import kafka.producer._
 import org.apache.log4j.Logger
-import kafka.message.{CompressionCodec, Message}
+import kafka.message.{ CompressionCodec, Message }
 import java.text.SimpleDateFormat
 import kafka.serializer._
 import java.util._
 import collection.immutable.List
-import kafka.utils.{VerifiableProperties, Logging}
+import kafka.utils.{ VerifiableProperties, Logging, Utils }
 import kafka.metrics.KafkaMetricsReporter
-
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
 
 /**
  * Load test for the producer
@@ -39,7 +40,7 @@ object ProducerPerformance extends Logging {
 
     val logger = Logger.getLogger(getClass)
     val config = new ProducerPerfConfig(args)
-    if(!config.isFixSize)
+    if (!config.isFixedSize)
       logger.info("WARN: Throughput will be slower due to changing message size per request")
 
     val totalBytesSent = new AtomicLong(0)
@@ -49,79 +50,80 @@ object ProducerPerformance extends Logging {
     val startMs = System.currentTimeMillis
     val rand = new java.util.Random
 
-    if(!config.hideHeader)
-        println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
-                        "total.data.sent.in.nMsg, nMsg.sec")
+    if (!config.hideHeader)
+      println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
+        "total.data.sent.in.nMsg, nMsg.sec")
 
-    for(i <- 0 until config.numThreads) {
+    for (i <- 0 until config.numThreads) {
       executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
     }
 
     allDone.await()
     val endMs = System.currentTimeMillis
     val elapsedSecs = (endMs - startMs) / 1000.0
-    val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024)
+    val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)
     println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(
       config.dateFormat.format(startMs), config.dateFormat.format(endMs),
       config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
-      totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
+      totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
     System.exit(0)
   }
 
   class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
     val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.")
-            .withRequiredArg
-            .describedAs("hostname:port,..,hostname:port")
-            .ofType(classOf[String])
+      .withRequiredArg
+      .describedAs("hostname:port,..,hostname:port")
+      .ofType(classOf[String])
     val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to")
       .withRequiredArg
       .describedAs("topic1,topic2..")
       .ofType(classOf[String])
     val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
-            .withRequiredArg()
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(3000)
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3000)
     val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number")
-            .withRequiredArg()
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(3)
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3)
     val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds")
-            .withRequiredArg()
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(100)
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(100)
     val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
-            "to complete")
-            .withRequiredArg()
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(-1)
+      "to complete")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(-1)
     val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
     val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
     val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
-            .withRequiredArg
-            .describedAs("number of threads")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(1)
+      .withRequiredArg
+      .describedAs("number of threads")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
     val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
-            "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
-            "in the form of 'Message:000...1:xxx...'")
-            .withRequiredArg()
-            .describedAs("initial message id")
-            .ofType(classOf[java.lang.Integer])
+      "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
+      "in the form of 'Message:000...1:xxx...'")
+      .withRequiredArg()
+      .describedAs("initial message id")
+      .ofType(classOf[java.lang.Integer])
     val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends")
-            .withRequiredArg()
-            .describedAs("message send time gap")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(0)
+      .withRequiredArg()
+      .describedAs("message send time gap")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
-            "set, the csv metrics will be outputed here")
+      "set, the csv metrics will be outputed here")
       .withRequiredArg
       .describedAs("metrics dictory")
       .ofType(classOf[java.lang.String])
+    val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
 
-    val options = parser.parse(args : _*)
-    for(arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
-      if(!options.has(arg)) {
+    val options = parser.parse(args: _*)
+    for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
+      if (!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
         System.exit(1)
@@ -135,19 +137,20 @@ object ProducerPerformance extends Logging {
     val hideHeader = options.has(hideHeaderOpt)
     val brokerList = options.valueOf(brokerListOpt)
     val messageSize = options.valueOf(messageSizeOpt).intValue
-    var isFixSize = !options.has(varyMessageSizeOpt)
+    var isFixedSize = !options.has(varyMessageSizeOpt)
     var isSync = options.has(syncOpt)
     var batchSize = options.valueOf(batchSizeOpt).intValue
     var numThreads = options.valueOf(numThreadsOpt).intValue
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
     val seqIdMode = options.has(initialMessageIdOpt)
     var initialMessageId: Int = 0
-    if(seqIdMode)
+    if (seqIdMode)
       initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
     val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
     val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
     val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue()
     val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue()
+    val useNewProducer = options.has(useNewProducerOpt)
 
     val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
 
@@ -167,44 +170,80 @@ object ProducerPerformance extends Logging {
     val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
   }
 
-  class ProducerThread(val threadId: Int,
-                       val config: ProducerPerfConfig,
-                       val totalBytesSent: AtomicLong,
-                       val totalMessagesSent: AtomicLong,
-                       val allDone: CountDownLatch,
-                       val rand: Random) extends Runnable {
+  trait Producer {
+    def send(topic: String, partition: Long, bytes: Array[Byte])
+    def close()
+  }
+
+  class OldRustyProducer(config: ProducerPerfConfig) extends Producer {
     val props = new Properties()
     props.put("metadata.broker.list", config.brokerList)
     props.put("compression.codec", config.compressionCodec.codec.toString)
-    props.put("reconnect.interval", Integer.MAX_VALUE.toString)
-    props.put("send.buffer.bytes", (64*1024).toString)
-    if(!config.isSync) {
-      props.put("producer.type","async")
+    props.put("send.buffer.bytes", (64 * 1024).toString)
+    if (!config.isSync) {
+      props.put("producer.type", "async")
       props.put("batch.num.messages", config.batchSize.toString)
       props.put("queue.enqueue.timeout.ms", "-1")
     }
-    props.put("client.id", "ProducerPerformance")
+    props.put("client.id", "perf-test")
     props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
     props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
     props.put("message.send.max.retries", config.producerNumRetries.toString)
     props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
     props.put("serializer.class", classOf[DefaultEncoder].getName.toString)
     props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString)
+    val producer = new kafka.producer.Producer[Long, Array[Byte]](new ProducerConfig(props))
 
-    
-    val producerConfig = new ProducerConfig(props)
-    val producer = new Producer[Long, Array[Byte]](producerConfig)
-    val seqIdNumDigit = 10   // no. of digits for max int value
+    def send(topic: String, partition: Long, bytes: Array[Byte]) {
+      this.producer.send(new KeyedMessage[Long, Array[Byte]](topic, partition, bytes))
+    }
+
+    def close() {
+      this.producer.close()
+    }
+  }
+
+  class NewShinyProducer(config: ProducerPerfConfig) extends Producer {
+    val props = new Properties()
+    props.put("metadata.broker.list", config.brokerList)
+    props.put("send.buffer.bytes", (64 * 1024).toString)
+    props.put("client.id", "perf-test")
+    props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
+    props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
+    val producer = new KafkaProducer(props)
+
+    def send(topic: String, partition: Long, bytes: Array[Byte]) {
+      val part = partition % this.producer.partitionsFor(topic).size
+      this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes))
+    }
+
+    def close() {
+      this.producer.close()
+    }
+  }
+
+  class ProducerThread(val threadId: Int,
+    val config: ProducerPerfConfig,
+    val totalBytesSent: AtomicLong,
+    val totalMessagesSent: AtomicLong,
+    val allDone: CountDownLatch,
+    val rand: Random) extends Runnable {
+    val seqIdNumDigit = 10 // no. of digits for max int value
 
     val messagesPerThread = config.numMessages / config.numThreads
     debug("Messages per thread = " + messagesPerThread)
+    val producer =
+      if (config.useNewProducer)
+        new NewShinyProducer(config)
+      else
+        new OldRustyProducer(config)
 
     // generate the sequential message ID
-    private val SEP            = ":"              // message field separator
+    private val SEP = ":" // message field separator
     private val messageIdLabel = "MessageID"
-    private val threadIdLabel  = "ThreadID"
-    private val topicLabel     = "Topic"
-    private var leftPaddedSeqId : String = ""
+    private val threadIdLabel = "ThreadID"
+    private val topicLabel = "Topic"
+    private var leftPaddedSeqId: String = ""
 
     private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
       // Each thread gets a unique range of sequential no. for its ids.
@@ -213,48 +252,43 @@ object ProducerPerformance extends Logging {
       // thread 1 IDs : 100 ~ 199
       // thread 2 IDs : 200 ~ 299
       // . . .
-      leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
+      leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId))
 
-      val msgHeader = topicLabel      + SEP +
-              topic           + SEP +
-              threadIdLabel   + SEP +
-              threadId        + SEP +
-              messageIdLabel  + SEP +
-              leftPaddedSeqId + SEP
+      val msgHeader = topicLabel + SEP +
+        topic + SEP +
+        threadIdLabel + SEP +
+        threadId + SEP +
+        messageIdLabel + SEP +
+        leftPaddedSeqId + SEP
 
-      val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
+      val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
       debug(seqMsgString)
       return seqMsgString.getBytes()
     }
 
-    private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = {
-      val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
-      val message =
-        if(config.seqIdMode) {
-          val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
-          generateMessageWithSeqId(topic, seqId, msgSize)
-        } else {
-          new Array[Byte](msgSize)
-        }
-      (new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length)
+    private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
+      val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
+      if (config.seqIdMode) {
+        val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
+        generateMessageWithSeqId(topic, seqId, msgSize)
+      } else {
+        new Array[Byte](msgSize)
+      }
     }
 
     override def run {
       var bytesSent = 0L
       var nSends = 0
       var j: Long = 0L
-      while(j < messagesPerThread) {
+      while (j < messagesPerThread) {
         try {
           config.topics.foreach(
             topic => {
-              val (producerData, bytesSent_) = generateProducerData(topic, j)
-              bytesSent += bytesSent_
-              producer.send(producerData)
+              producer.send(topic, j, generateProducerData(topic, j))
               nSends += 1
-              if(config.messageSendGapMs > 0)
+              if (config.messageSendGapMs > 0)
                 Thread.sleep(config.messageSendGapMs)
-            }
-          )
+            })
         } catch {
           case e: Exception => error("Error sending messages", e)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index ddcfc41..12d84f8 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -140,7 +140,7 @@ object KafkaBuild extends Build {
 
   lazy val kafka    = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf).settings((commonSettings ++
     runRatTask ++ releaseTask ++ releaseZipTask ++ releaseTarTask): _*)
-  lazy val core     = Project(id = "core", base = file("core")).settings(commonSettings: _*)
+  lazy val core     = Project(id = "core", base = file("core")).settings(commonSettings: _*) dependsOn(clients)
   lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core)
   lazy val perf     = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core)