You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/11 05:59:43 UTC
[4/4] git commit: KAFKA-1236 Fix various breakages in the perf tests.
Make the producer test use either the old or the new producer.
KAFKA-1236 Fix various breakages in the perf tests. Make the producer test use either the old or the new producer.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b80dbb9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b80dbb9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b80dbb9
Branch: refs/heads/trunk
Commit: 6b80dbb97b3d9b533f60d5c09639ea6c7c5f2be5
Parents: 02bb382
Author: Jay Kreps <ja...@gmail.com>
Authored: Sun Feb 9 15:16:51 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Mon Feb 10 20:49:08 2014 -0800
----------------------------------------------------------------------
bin/kafka-run-class.sh | 8 +-
.../clients/producer/internals/Sender.java | 27 ++-
.../clients/tools/ProducerPerformance.java | 19 +-
.../scala/other/kafka/TestEndToEndLatency.scala | 22 +-
.../scala/kafka/perf/ConsumerPerformance.scala | 105 +++++----
.../scala/kafka/perf/ProducerPerformance.scala | 216 +++++++++++--------
project/Build.scala | 2 +-
7 files changed, 213 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 75a3fc4..e0c31ee 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -50,22 +50,22 @@ do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/clients/build/libs//kafka-clients*.jar;
+for file in $base_dir/examples/build/libs//kafka-examples*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/examples/build/libs//kafka-examples*.jar;
+for file in $base_dir/contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar;
+for file in $base_dir/contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar;
+for file in $base_dir/clients/target/scala-${SCALA_VERSION}/clients*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 3e10e32..e8c194c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
@@ -152,7 +148,7 @@ public class Sender implements Runnable {
// do the I/O
try {
- this.selector.poll(5L, sends);
+ this.selector.poll(100L, sends);
} catch (IOException e) {
e.printStackTrace();
}
@@ -189,6 +185,7 @@ public class Sender implements Runnable {
public void initiateClose() {
this.running = false;
this.accumulator.close();
+ this.wakeup();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 108d61e..3ebbb80 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -26,22 +26,25 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.record.Records;
-
public class ProducerPerformance {
public static void main(String[] args) throws Exception {
- if (args.length != 3) {
- System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_records record_size");
+ if (args.length != 5) {
+ System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks");
System.exit(1);
}
String url = args[0];
- int numRecords = Integer.parseInt(args[1]);
- int recordSize = Integer.parseInt(args[2]);
+ String topicName = args[1];
+ int numRecords = Integer.parseInt(args[2]);
+ int recordSize = Integer.parseInt(args[3]);
+ int acks = Integer.parseInt(args[4]);
Properties props = new Properties();
- props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
+ props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, Integer.toString(acks));
props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
+ props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024));
+ props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024));
KafkaProducer producer = new KafkaProducer(props);
Callback callback = new Callback() {
@@ -52,7 +55,7 @@ public class ProducerPerformance {
};
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
- ProducerRecord record = new ProducerRecord("test", payload);
+ ProducerRecord record = new ProducerRecord(topicName, payload);
long start = System.currentTimeMillis();
long maxLatency = -1L;
long totalLatency = 0;
@@ -75,8 +78,8 @@ public class ProducerPerformance {
long ellapsed = System.currentTimeMillis() - start;
double msgsSec = 1000.0 * numRecords / (double) ellapsed;
double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
- System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec);
producer.close();
+ System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
index c4aed10..f5d39dd 100644
--- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
+++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -24,23 +24,23 @@ import kafka.message._
object TestEndToEndLatency {
def main(args: Array[String]) {
- if(args.length != 3) {
- System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect num_messages")
+ if (args.length != 4) {
+ System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages")
System.exit(1)
}
val brokerList = args(0)
val zkConnect = args(1)
- val numMessages = args(2).toInt
- val topic = "test"
-
+ val topic = args(2)
+ val numMessages = args(3).toInt
+
val consumerProps = new Properties()
consumerProps.put("group.id", topic)
consumerProps.put("auto.commit", "true")
consumerProps.put("auto.offset.reset", "largest")
consumerProps.put("zookeeper.connect", zkConnect)
consumerProps.put("socket.timeout.ms", 1201000.toString)
-
+
val config = new ConsumerConfig(consumerProps)
val connector = Consumer.create(config)
var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
@@ -50,16 +50,16 @@ object TestEndToEndLatency {
producerProps.put("metadata.broker.list", brokerList)
producerProps.put("producer.type", "sync")
val producer = new Producer[Any, Any](new ProducerConfig(producerProps))
-
- val message = new Message("hello there beautiful".getBytes)
+
+ val message = "hello there beautiful".getBytes
var totalTime = 0.0
- for(i <- 0 until numMessages) {
+ for (i <- 0 until numMessages) {
var begin = System.nanoTime
producer.send(new KeyedMessage(topic, message))
val received = iter.next
val elapsed = System.nanoTime - begin
// poor man's progress bar
- if(i % 10000 == 0)
+ if (i % 1000 == 0)
println(i + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index 55ee01b..4dde468 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -23,7 +23,7 @@ import java.nio.channels.ClosedByInterruptException
import org.apache.log4j.Logger
import kafka.message.Message
import kafka.utils.ZkUtils
-import java.util.{Random, Properties}
+import java.util.{ Random, Properties }
import kafka.consumer._
import java.text.SimpleDateFormat
@@ -40,8 +40,8 @@ object ConsumerPerformance {
var totalMessagesRead = new AtomicLong(0)
var totalBytesRead = new AtomicLong(0)
- if(!config.hideHeader) {
- if(!config.showDetailedStats)
+ if (!config.hideHeader) {
+ if (!config.showDetailedStats)
println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
else
println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
@@ -57,7 +57,7 @@ object ConsumerPerformance {
for ((topic, streamList) <- topicMessageStreams)
for (i <- 0 until streamList.length)
threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config,
- totalMessagesRead, totalBytesRead)
+ totalMessagesRead, totalBytesRead)
logger.info("Sleeping for 1 second.")
Thread.sleep(1000)
@@ -67,61 +67,61 @@ object ConsumerPerformance {
thread.start
for (thread <- threadList)
- thread.shutdown
+ thread.join
val endMs = System.currentTimeMillis
val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0
- if(!config.showDetailedStats) {
- val totalMBRead = (totalBytesRead.get*1.0)/(1024*1024)
+ if (!config.showDetailedStats) {
+ val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
- config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get,
- totalMessagesRead.get/elapsedSecs))
+ config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get,
+ totalMessagesRead.get / elapsedSecs))
}
System.exit(0)
}
class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
- "Multiple URLS can be given to allow fail-over.")
- .withRequiredArg
- .describedAs("urls")
- .ofType(classOf[String])
+ "Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val groupIdOpt = parser.accepts("group", "The group id to consume on.")
- .withRequiredArg
- .describedAs("gid")
- .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
- .ofType(classOf[String])
+ .withRequiredArg
+ .describedAs("gid")
+ .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
+ .ofType(classOf[String])
val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1024 * 1024)
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1024 * 1024)
val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
"offset to consume from, start with the latest message present in the log rather than the earliest message.")
val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(2 * 1024 * 1024)
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(2 * 1024 * 1024)
val numThreadsOpt = parser.accepts("threads", "Number of processing threads.")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(10)
+ .withRequiredArg
+ .describedAs("count")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(10)
val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
+ .withRequiredArg
+ .describedAs("count")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
- val options = parser.parse(args : _*)
+ val options = parser.parse(args: _*)
- for(arg <- List(topicOpt, zkConnectOpt)) {
- if(!options.has(arg)) {
+ for (arg <- List(topicOpt, zkConnectOpt)) {
+ if (!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
@@ -132,7 +132,7 @@ object ConsumerPerformance {
props.put("group.id", options.valueOf(groupIdOpt))
props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
- props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
+ props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
props.put("consumer.timeout.ms", "5000")
props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
@@ -147,13 +147,8 @@ object ConsumerPerformance {
}
class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
- config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
+ config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
extends Thread(name) {
- private val shutdownLatch = new CountDownLatch(1)
-
- def shutdown(): Unit = {
- shutdownLatch.await
- }
override def run() {
var bytesRead = 0L
@@ -164,43 +159,41 @@ object ConsumerPerformance {
var lastMessagesRead = 0L
try {
- for (messageAndMetadata <- stream if messagesRead < config.numMessages) {
+ val iter = stream.iterator
+ while (iter.hasNext && messagesRead < config.numMessages) {
+ val messageAndMetadata = iter.next
messagesRead += 1
bytesRead += messageAndMetadata.message.length
if (messagesRead % config.reportingInterval == 0) {
- if(config.showDetailedStats)
+ if (config.showDetailedStats)
printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis)
lastReportTime = System.currentTimeMillis
lastMessagesRead = messagesRead
lastBytesRead = bytesRead
}
}
- }
- catch {
+ } catch {
case _: InterruptedException =>
case _: ClosedByInterruptException =>
case _: ConsumerTimeoutException =>
- case e: Throwable => throw e
+ case e: Throwable => e.printStackTrace()
}
totalMessagesRead.addAndGet(messagesRead)
totalBytesRead.addAndGet(bytesRead)
- if(config.showDetailedStats)
+ if (config.showDetailedStats)
printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis)
- shutdownComplete
}
private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
- startMs: Long, endMs: Long) = {
+ startMs: Long, endMs: Long) = {
val elapsedMs = endMs - startMs
- val totalMBRead = (bytesRead*1.0)/(1024*1024)
- val mbRead = ((bytesRead - lastBytesRead)*1.0)/(1024*1024)
+ val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
+ val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id,
config.consumerConfig.fetchMessageMaxBytes, totalMBRead,
- 1000.0*(mbRead/elapsedMs), messagesRead, ((messagesRead - lastMessagesRead)/elapsedMs)*1000.0))
+ 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
}
-
- private def shutdownComplete() = shutdownLatch.countDown
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index ad2ac26..5d399d9 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -17,18 +17,19 @@
package kafka.perf
-import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.{ CountDownLatch, Executors }
import java.util.concurrent.atomic.AtomicLong
import kafka.producer._
import org.apache.log4j.Logger
-import kafka.message.{CompressionCodec, Message}
+import kafka.message.{ CompressionCodec, Message }
import java.text.SimpleDateFormat
import kafka.serializer._
import java.util._
import collection.immutable.List
-import kafka.utils.{VerifiableProperties, Logging}
+import kafka.utils.{ VerifiableProperties, Logging, Utils }
import kafka.metrics.KafkaMetricsReporter
-
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
/**
* Load test for the producer
@@ -39,7 +40,7 @@ object ProducerPerformance extends Logging {
val logger = Logger.getLogger(getClass)
val config = new ProducerPerfConfig(args)
- if(!config.isFixSize)
+ if (!config.isFixedSize)
logger.info("WARN: Throughput will be slower due to changing message size per request")
val totalBytesSent = new AtomicLong(0)
@@ -49,79 +50,80 @@ object ProducerPerformance extends Logging {
val startMs = System.currentTimeMillis
val rand = new java.util.Random
- if(!config.hideHeader)
- println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
- "total.data.sent.in.nMsg, nMsg.sec")
+ if (!config.hideHeader)
+ println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
+ "total.data.sent.in.nMsg, nMsg.sec")
- for(i <- 0 until config.numThreads) {
+ for (i <- 0 until config.numThreads) {
executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
}
allDone.await()
val endMs = System.currentTimeMillis
val elapsedSecs = (endMs - startMs) / 1000.0
- val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024)
+ val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)
println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(
config.dateFormat.format(startMs), config.dateFormat.format(endMs),
config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
- totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
+ totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
System.exit(0)
}
class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.")
- .withRequiredArg
- .describedAs("hostname:port,..,hostname:port")
- .ofType(classOf[String])
+ .withRequiredArg
+ .describedAs("hostname:port,..,hostname:port")
+ .ofType(classOf[String])
val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to")
.withRequiredArg
.describedAs("topic1,topic2..")
.ofType(classOf[String])
val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
- .withRequiredArg()
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(3000)
+ .withRequiredArg()
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(3000)
val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number")
- .withRequiredArg()
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(3)
+ .withRequiredArg()
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(3)
val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds")
- .withRequiredArg()
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(100)
+ .withRequiredArg()
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(100)
val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
- "to complete")
- .withRequiredArg()
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(-1)
+ "to complete")
+ .withRequiredArg()
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(-1)
val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
- .withRequiredArg
- .describedAs("number of threads")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
+ .withRequiredArg
+ .describedAs("number of threads")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
- "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
- "in the form of 'Message:000...1:xxx...'")
- .withRequiredArg()
- .describedAs("initial message id")
- .ofType(classOf[java.lang.Integer])
+ "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
+ "in the form of 'Message:000...1:xxx...'")
+ .withRequiredArg()
+ .describedAs("initial message id")
+ .ofType(classOf[java.lang.Integer])
val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends")
- .withRequiredArg()
- .describedAs("message send time gap")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(0)
+ .withRequiredArg()
+ .describedAs("message send time gap")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(0)
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
- "set, the csv metrics will be outputed here")
+ "set, the csv metrics will be outputed here")
.withRequiredArg
.describedAs("metrics dictory")
.ofType(classOf[java.lang.String])
+ val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
- val options = parser.parse(args : _*)
- for(arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
- if(!options.has(arg)) {
+ val options = parser.parse(args: _*)
+ for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
+ if (!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
@@ -135,19 +137,20 @@ object ProducerPerformance extends Logging {
val hideHeader = options.has(hideHeaderOpt)
val brokerList = options.valueOf(brokerListOpt)
val messageSize = options.valueOf(messageSizeOpt).intValue
- var isFixSize = !options.has(varyMessageSizeOpt)
+ var isFixedSize = !options.has(varyMessageSizeOpt)
var isSync = options.has(syncOpt)
var batchSize = options.valueOf(batchSizeOpt).intValue
var numThreads = options.valueOf(numThreadsOpt).intValue
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
val seqIdMode = options.has(initialMessageIdOpt)
var initialMessageId: Int = 0
- if(seqIdMode)
+ if (seqIdMode)
initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue()
val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue()
+ val useNewProducer = options.has(useNewProducerOpt)
val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
@@ -167,44 +170,80 @@ object ProducerPerformance extends Logging {
val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
}
- class ProducerThread(val threadId: Int,
- val config: ProducerPerfConfig,
- val totalBytesSent: AtomicLong,
- val totalMessagesSent: AtomicLong,
- val allDone: CountDownLatch,
- val rand: Random) extends Runnable {
+ trait Producer {
+ def send(topic: String, partition: Long, bytes: Array[Byte])
+ def close()
+ }
+
+ class OldRustyProducer(config: ProducerPerfConfig) extends Producer {
val props = new Properties()
props.put("metadata.broker.list", config.brokerList)
props.put("compression.codec", config.compressionCodec.codec.toString)
- props.put("reconnect.interval", Integer.MAX_VALUE.toString)
- props.put("send.buffer.bytes", (64*1024).toString)
- if(!config.isSync) {
- props.put("producer.type","async")
+ props.put("send.buffer.bytes", (64 * 1024).toString)
+ if (!config.isSync) {
+ props.put("producer.type", "async")
props.put("batch.num.messages", config.batchSize.toString)
props.put("queue.enqueue.timeout.ms", "-1")
}
- props.put("client.id", "ProducerPerformance")
+ props.put("client.id", "perf-test")
props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
props.put("message.send.max.retries", config.producerNumRetries.toString)
props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
props.put("serializer.class", classOf[DefaultEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString)
+ val producer = new kafka.producer.Producer[Long, Array[Byte]](new ProducerConfig(props))
-
- val producerConfig = new ProducerConfig(props)
- val producer = new Producer[Long, Array[Byte]](producerConfig)
- val seqIdNumDigit = 10 // no. of digits for max int value
+ def send(topic: String, partition: Long, bytes: Array[Byte]) {
+ this.producer.send(new KeyedMessage[Long, Array[Byte]](topic, partition, bytes))
+ }
+
+ def close() {
+ this.producer.close()
+ }
+ }
+
+ class NewShinyProducer(config: ProducerPerfConfig) extends Producer {
+ val props = new Properties()
+ props.put("metadata.broker.list", config.brokerList)
+ props.put("send.buffer.bytes", (64 * 1024).toString)
+ props.put("client.id", "perf-test")
+ props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
+ props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
+ val producer = new KafkaProducer(props)
+
+ def send(topic: String, partition: Long, bytes: Array[Byte]) {
+ val part = partition % this.producer.partitionsFor(topic).size
+ this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes))
+ }
+
+ def close() {
+ this.producer.close()
+ }
+ }
+
+ class ProducerThread(val threadId: Int,
+ val config: ProducerPerfConfig,
+ val totalBytesSent: AtomicLong,
+ val totalMessagesSent: AtomicLong,
+ val allDone: CountDownLatch,
+ val rand: Random) extends Runnable {
+ val seqIdNumDigit = 10 // no. of digits for max int value
val messagesPerThread = config.numMessages / config.numThreads
debug("Messages per thread = " + messagesPerThread)
+ val producer =
+ if (config.useNewProducer)
+ new NewShinyProducer(config)
+ else
+ new OldRustyProducer(config)
// generate the sequential message ID
- private val SEP = ":" // message field separator
+ private val SEP = ":" // message field separator
private val messageIdLabel = "MessageID"
- private val threadIdLabel = "ThreadID"
- private val topicLabel = "Topic"
- private var leftPaddedSeqId : String = ""
+ private val threadIdLabel = "ThreadID"
+ private val topicLabel = "Topic"
+ private var leftPaddedSeqId: String = ""
private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
// Each thread gets a unique range of sequential no. for its ids.
@@ -213,48 +252,43 @@ object ProducerPerformance extends Logging {
// thread 1 IDs : 100 ~ 199
// thread 2 IDs : 200 ~ 299
// . . .
- leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
+ leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId))
- val msgHeader = topicLabel + SEP +
- topic + SEP +
- threadIdLabel + SEP +
- threadId + SEP +
- messageIdLabel + SEP +
- leftPaddedSeqId + SEP
+ val msgHeader = topicLabel + SEP +
+ topic + SEP +
+ threadIdLabel + SEP +
+ threadId + SEP +
+ messageIdLabel + SEP +
+ leftPaddedSeqId + SEP
- val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
+ val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
debug(seqMsgString)
return seqMsgString.getBytes()
}
- private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = {
- val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
- val message =
- if(config.seqIdMode) {
- val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
- generateMessageWithSeqId(topic, seqId, msgSize)
- } else {
- new Array[Byte](msgSize)
- }
- (new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length)
+ private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
+ val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
+ if (config.seqIdMode) {
+ val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
+ generateMessageWithSeqId(topic, seqId, msgSize)
+ } else {
+ new Array[Byte](msgSize)
+ }
}
override def run {
var bytesSent = 0L
var nSends = 0
var j: Long = 0L
- while(j < messagesPerThread) {
+ while (j < messagesPerThread) {
try {
config.topics.foreach(
topic => {
- val (producerData, bytesSent_) = generateProducerData(topic, j)
- bytesSent += bytesSent_
- producer.send(producerData)
+ producer.send(topic, j, generateProducerData(topic, j))
nSends += 1
- if(config.messageSendGapMs > 0)
+ if (config.messageSendGapMs > 0)
Thread.sleep(config.messageSendGapMs)
- }
- )
+ })
} catch {
case e: Exception => error("Error sending messages", e)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b80dbb9/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index ddcfc41..12d84f8 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -140,7 +140,7 @@ object KafkaBuild extends Build {
lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf).settings((commonSettings ++
runRatTask ++ releaseTask ++ releaseZipTask ++ releaseTarTask): _*)
- lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*)
+ lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) dependsOn(clients)
lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core)
lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core)