You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/13 22:51:12 UTC

kafka git commit: KAFKA-3490; Multiple version support for ducktape performance tests

Repository: kafka
Updated Branches:
  refs/heads/trunk 667ff7ef7 -> c1694833d


KAFKA-3490; Multiple version support for ducktape performance tests

Author: Ismael Juma <is...@juma.me.uk>
Author: Geoff Anderson <ge...@confluent.io>

Reviewers: Geoff Anderson <ge...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1173 from ijuma/kafka-3490-multiple-version-support-perf-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1694833
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1694833
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1694833

Branch: refs/heads/trunk
Commit: c1694833d5c095e47e5767f38c3e85bbe927a0a7
Parents: 667ff7e
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Apr 13 13:50:49 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Apr 13 13:50:49 2016 -0700

----------------------------------------------------------------------
 .../kafkatest/benchmarks/core/benchmark_test.py |  85 +++++-----
 .../sanity_checks/test_performance_services.py  |  88 ++++++++++
 .../kafkatest/services/performance/__init__.py  |   4 +-
 .../performance/consumer_performance.py         |  45 ++++--
 .../services/performance/end_to_end_latency.py  |  62 ++++++--
 .../services/performance/performance.py         |  23 +++
 .../performance/producer_performance.py         | 159 ++++++++++++++-----
 .../services/templates/tools_log4j.properties   |   2 +-
 vagrant/base.sh                                 |   2 +
 9 files changed, 364 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/benchmarks/core/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 9c2e32d..d252e5d 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -20,7 +20,8 @@ from ducktape.mark import matrix
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
+from kafkatest.services.kafka.version import TRUNK, KafkaVersion
+from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, compute_aggregate_throughput
 
 
 TOPIC_REP_ONE = "topic-replication-factor-one"
@@ -54,11 +55,12 @@ class Benchmark(Test):
     def setUp(self):
         self.zk.start()
 
-    def start_kafka(self, security_protocol, interbroker_security_protocol):
+    def start_kafka(self, security_protocol, interbroker_security_protocol, version):
         self.kafka = KafkaService(
             self.test_context, self.num_brokers,
             self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
+            version=version)
         self.kafka.log_level = "INFO"  # We don't DEBUG logging here
         self.kafka.start()
 
@@ -67,7 +69,8 @@ class Benchmark(Test):
     @parametrize(acks=-1, topic=TOPIC_REP_THREE)
     @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
     @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
-    def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'):
+    def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT',
+                                 client_version=str(TRUNK), broker_version=str(TRUNK)):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
@@ -76,13 +79,16 @@ class Benchmark(Test):
         Collect and return aggregate throughput statistics after all messages have been acknowledged.
         (This runs ProducerPerformance.java under the hood)
         """
-        self.start_kafka(security_protocol, security_protocol)
+        client_version = KafkaVersion(client_version)
+        broker_version = KafkaVersion(broker_version)
+        self.validate_versions(client_version, broker_version)
+        self.start_kafka(security_protocol, security_protocol, broker_version)
         # Always generate the same total amount of data
         nrecords = int(self.target_data_size / message_size)
 
         self.producer = ProducerPerformanceService(
             self.test_context, num_producers, self.kafka, topic=topic,
-            num_records=nrecords, record_size=message_size,  throughput=-1,
+            num_records=nrecords, record_size=message_size,  throughput=-1, version=client_version,
             settings={
                 'acks': acks,
                 'batch.size': self.batch_size,
@@ -92,7 +98,8 @@ class Benchmark(Test):
 
     @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
     @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None):
+    def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None,
+                                           client_version=str(TRUNK), broker_version=str(TRUNK)):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
@@ -101,13 +108,16 @@ class Benchmark(Test):
 
         (This runs ProducerPerformance.java under the hood)
         """
+        client_version = KafkaVersion(client_version)
+        broker_version = KafkaVersion(broker_version)
+        self.validate_versions(client_version, broker_version)
         if interbroker_security_protocol is None:
             interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol)
+        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
         self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
             topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
-            throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
+            throughput=-1, version=client_version, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
             intermediate_stats=True
         )
         self.producer.run()
@@ -135,10 +145,10 @@ class Benchmark(Test):
         self.logger.info("\n".join(summary))
         return data
 
-
     @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
     @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
-    def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None):
+    def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None,
+                                client_version=str(TRUNK), broker_version=str(TRUNK)):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
@@ -148,13 +158,16 @@ class Benchmark(Test):
 
         (Under the hood, this simply runs EndToEndLatency.scala)
         """
+        client_version = KafkaVersion(client_version)
+        broker_version = KafkaVersion(broker_version)
+        self.validate_versions(client_version, broker_version)
         if interbroker_security_protocol is None:
             interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol)
+        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
         self.logger.info("BENCHMARK: End to end latency")
         self.perf = EndToEndLatencyService(
             self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE, num_records=10000
+            topic=TOPIC_REP_THREE, num_records=10000, version=client_version
         )
         self.perf.run()
         return latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
@@ -162,7 +175,8 @@ class Benchmark(Test):
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
     @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
     @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True):
+    def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True,
+                                   client_version=str(TRUNK), broker_version=str(TRUNK)):
         """
         Setup: 1 node zk + 3 node kafka cluster
         Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
@@ -172,15 +186,18 @@ class Benchmark(Test):
 
         (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
         """
+        client_version = KafkaVersion(client_version)
+        broker_version = KafkaVersion(broker_version)
+        self.validate_versions(client_version, broker_version)
         if interbroker_security_protocol is None:
             interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol)
+        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
         num_records = 10 * 1000 * 1000  # 10e6
 
         self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
             topic=TOPIC_REP_THREE,
-            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version,
             settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
         )
         self.consumer = ConsumerPerformanceService(
@@ -200,21 +217,25 @@ class Benchmark(Test):
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
     @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
     @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1):
+    def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1,
+                                 client_version=str(TRUNK), broker_version=str(TRUNK)):
         """
         Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
         (using new consumer iff new_consumer == True), and report throughput.
         """
+        client_version = KafkaVersion(client_version)
+        broker_version = KafkaVersion(broker_version)
+        self.validate_versions(client_version, broker_version)
         if interbroker_security_protocol is None:
             interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol)
+        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
         num_records = 10 * 1000 * 1000  # 10e6
 
         # seed kafka w/messages
         self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
             topic=TOPIC_REP_THREE,
-            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version,
             settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
         )
         self.producer.run()
@@ -227,27 +248,5 @@ class Benchmark(Test):
         self.consumer.run()
         return compute_aggregate_throughput(self.consumer)
 
-
-def throughput(records_per_sec, mb_per_sec):
-    """Helper method to ensure uniform representation of throughput data"""
-    return {
-        "records_per_sec": records_per_sec,
-        "mb_per_sec": mb_per_sec
-    }
-
-
-def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
-    """Helper method to ensure uniform representation of latency data"""
-    return {
-        "latency_50th_ms": latency_50th_ms,
-        "latency_99th_ms": latency_99th_ms,
-        "latency_999th_ms": latency_999th_ms
-    }
-
-
-def compute_aggregate_throughput(perf):
-    """Helper method for computing throughput after running a performance service."""
-    aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
-    aggregate_mbps = sum([r['mbps'] for r in perf.results])
-
-    return throughput(aggregate_rate, aggregate_mbps)
+    def validate_versions(self, client_version, broker_version):
+        assert client_version <= broker_version, "Client version %s should be <= than broker version %s" (client_version, broker_version)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/sanity_checks/test_performance_services.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py
new file mode 100644
index 0000000..16d5d32
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_performance_services.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2, LATEST_0_9, KafkaVersion
+from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
+from kafkatest.services.performance import latency, compute_aggregate_throughput
+
+
+class PerformanceServiceTest(Test):
+    def __init__(self, test_context):
+        super(PerformanceServiceTest, self).__init__(test_context)
+        self.record_size = 100
+        self.num_records = 10000
+        self.topic = "topic"
+
+        self.zk = ZookeeperService(test_context, 1)
+
+    def setUp(self):
+        self.zk.start()
+
+    # We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
+    # the overhead should be manageable.
+    @parametrize(version=str(LATEST_0_8_2))
+    @parametrize(version=str(LATEST_0_9), new_consumer=False)
+    @parametrize(version=str(LATEST_0_9), new_consumer=True)
+    @parametrize(version=str(TRUNK), new_consumer=False)
+    @parametrize(version=str(TRUNK), new_consumer=True)
+    def test_version(self, version=str(LATEST_0_9), new_consumer=False):
+        """
+        Sanity check out producer performance service - verify that we can run the service with a small
+        number of messages. The actual stats here are pretty meaningless since the number of messages is quite small.
+        """
+        version = KafkaVersion(version)
+        self.kafka = KafkaService(
+            self.test_context, 1,
+            self.zk, topics={self.topic: {'partitions': 1, 'replication-factor': 1}}, version=version)
+        self.kafka.start()
+
+        # check basic run of producer performance
+        self.producer_perf = ProducerPerformanceService(
+            self.test_context, 1, self.kafka, topic=self.topic,
+            num_records=self.num_records, record_size=self.record_size,
+            throughput=1000000000,  # Set impossibly for no throttling for equivalent behavior between 0.8.X and 0.9.X
+            version=version,
+            settings={
+                'acks': 1,
+                'batch.size': 8*1024,
+                'buffer.memory': 64*1024*1024})
+        self.producer_perf.run()
+        producer_perf_data = compute_aggregate_throughput(self.producer_perf)
+
+        # check basic run of end to end latency
+        self.end_to_end = EndToEndLatencyService(
+            self.test_context, 1, self.kafka,
+            topic=self.topic, num_records=self.num_records, version=version)
+        self.end_to_end.run()
+        end_to_end_data = latency(self.end_to_end.results[0]['latency_50th_ms'],  self.end_to_end.results[0]['latency_99th_ms'], self.end_to_end.results[0]['latency_999th_ms'])
+
+        # check basic run of consumer performance service
+        self.consumer_perf = ConsumerPerformanceService(
+            self.test_context, 1, self.kafka, new_consumer=new_consumer,
+            topic=self.topic, version=version, messages=self.num_records)
+        self.consumer_perf.group = "test-consumer-group"
+        self.consumer_perf.run()
+        consumer_perf_data = compute_aggregate_throughput(self.consumer_perf)
+
+        return {
+            "producer_performance": producer_perf_data,
+            "end_to_end_latency": end_to_end_data,
+            "consumer_performance": consumer_perf_data
+        }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/__init__.py b/tests/kafkatest/services/performance/__init__.py
index a72e3b7..9eddcaa 100644
--- a/tests/kafkatest/services/performance/__init__.py
+++ b/tests/kafkatest/services/performance/__init__.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from performance import PerformanceService
+from performance import PerformanceService, throughput, latency, compute_aggregate_throughput
 from end_to_end_latency import EndToEndLatencyService
 from producer_performance import ProducerPerformanceService
-from consumer_performance import ConsumerPerformanceService
\ No newline at end of file
+from consumer_performance import ConsumerPerformanceService

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index f8289bc..def27b1 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -14,8 +14,9 @@
 # limitations under the License.
 
 from kafkatest.services.performance import PerformanceService
-from kafkatest.services.kafka.directory import kafka_dir
 from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
 
 import os
 
@@ -69,7 +70,7 @@ class ConsumerPerformanceService(PerformanceService):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}):
+    def __init__(self, context, num_nodes, kafka, topic, messages, version=TRUNK, new_consumer=False, settings={}):
         super(ConsumerPerformanceService, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
@@ -78,6 +79,13 @@ class ConsumerPerformanceService(PerformanceService):
         self.new_consumer = new_consumer
         self.settings = settings
 
+        assert version >= V_0_9_0_0 or (not new_consumer), \
+            "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version)
+
+        security_protocol = self.security_config.security_protocol
+        assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
+            "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
+
         # These less-frequently used settings can be updated manually after instantiation
         self.fetch_size = None
         self.socket_buffer_size = None
@@ -86,6 +94,9 @@ class ConsumerPerformanceService(PerformanceService):
         self.group = None
         self.from_latest = None
 
+        for node in self.nodes:
+            node.version = version
+
     @property
     def args(self):
         """Dictionary of arguments used to start the Consumer Performance script."""
@@ -127,7 +138,10 @@ class ConsumerPerformanceService(PerformanceService):
         cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node)
         for key, value in self.args.items():
             cmd += " --%s %s" % (key, value)
-        cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
+
+        if node.version >= V_0_9_0_0:
+            # This is only used for security settings
+            cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
 
         for key, value in self.settings.items():
             cmd += " %s=%s" % (str(key), str(value))
@@ -136,6 +150,22 @@ class ConsumerPerformanceService(PerformanceService):
                                                         'stderr': ConsumerPerformanceService.STDERR_CAPTURE}
         return cmd
 
+    def parse_results(self, line, version):
+        parts = line.split(',')
+        if version >= V_0_9_0_0:
+            result = {
+                'total_mb': float(parts[2]),
+                'mbps': float(parts[3]),
+                'records_per_sec': float(parts[5]),
+            }
+        else:
+            result = {
+                'total_mb': float(parts[3]),
+                'mbps': float(parts[4]),
+                'records_per_sec': float(parts[6]),
+            }
+        return result
+
     def _worker(self, idx, node):
         node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
 
@@ -149,11 +179,6 @@ class ConsumerPerformanceService(PerformanceService):
         last = None
         for line in node.account.ssh_capture(cmd):
             last = line
-        # Parse and save the last line's information
-        parts = last.split(',')
 
-        self.results[idx-1] = {
-            'total_mb': float(parts[2]),
-            'mbps': float(parts[3]),
-            'records_per_sec': float(parts[5]),
-        }
+        # Parse and save the last line's information
+        self.results[idx-1] = self.parse_results(last, node.version)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 049eebc..08eff70 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -17,9 +17,11 @@ from kafkatest.services.performance import PerformanceService
 from kafkatest.services.security.security_config import SecurityConfig
 
 from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
 
 
 class EndToEndLatencyService(PerformanceService):
+    MESSAGE_BYTES = 21  # 0.8.X messages are fixed at 21 bytes, so we'll match that for other versions
 
     logs = {
         "end_to_end_latency_log": {
@@ -27,37 +29,79 @@ class EndToEndLatencyService(PerformanceService):
             "collect_default": True},
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
+    def __init__(self, context, num_nodes, kafka, topic, num_records, version=TRUNK, consumer_fetch_max_wait=100, acks=1):
         super(EndToEndLatencyService, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
+
+        security_protocol = self.security_config.security_protocol
+        assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
+            "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
+
         self.args = {
             'topic': topic,
             'num_records': num_records,
             'consumer_fetch_max_wait': consumer_fetch_max_wait,
             'acks': acks,
-            'kafka_opts': self.security_config.kafka_opts
+            'kafka_opts': self.security_config.kafka_opts,
+            'message_bytes': EndToEndLatencyService.MESSAGE_BYTES
         }
 
-    def _worker(self, idx, node):
-        args = self.args.copy()
-        self.security_config.setup_node(node)
+        for node in self.nodes:
+            node.version = version
+
+    @property
+    def security_config_file(self):
         if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
             security_config_file = SecurityConfig.CONFIG_DIR + "/security.properties"
-            node.account.create_file(security_config_file, str(self.security_config))
         else:
             security_config_file = ""
+        return security_config_file
+
+    def start_cmd(self, node):
+        args = self.args.copy()
         args.update({
             'zk_connect': self.kafka.zk.connect_setting(),
             'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
-            'security_config_file': security_config_file,
+            'security_config_file': self.security_config_file,
             'kafka_dir': kafka_dir(node)
         })
 
-        cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
-        cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(security_config_file)s" % args
+        if node.version >= V_0_9_0_0:
+            """
+            val brokerList = args(0)
+            val topic = args(1)
+            val numMessages = args(2).toInt
+            val producerAcks = args(3)
+            val messageLen = args(4).toInt
+            """
+
+            cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
+            cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(security_config_file)s" % args
+        else:
+            """
+            val brokerList = args(0)
+            val zkConnect = args(1)
+            val topic = args(2)
+            val numMessages = args(3).toInt
+            val consumerFetchMaxWait = args(4).toInt
+            val producerAcks = args(5).toInt
+            """
+
+            # Set fetch max wait to 0 to match behavior in later versions
+            cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % args
+            cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d 0 %(acks)d" % args
+
         cmd += " | tee /mnt/end-to-end-latency.log"
 
+        return cmd
+
+    def _worker(self, idx, node):
+        self.security_config.setup_node(node)
+        if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
+            node.account.create_file(self.security_config_file, str(self.security_config))
+
+        cmd = self.start_cmd(node)
         self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
         results = {}
         for line in node.account.ssh_capture(cmd):

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py
index 6d286f6..1eab197 100644
--- a/tests/kafkatest/services/performance/performance.py
+++ b/tests/kafkatest/services/performance/performance.py
@@ -27,3 +27,26 @@ class PerformanceService(BackgroundThreadService):
         node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf /mnt/*", allow_fail=False)
 
+def throughput(records_per_sec, mb_per_sec):
+    """Helper method to ensure uniform representation of throughput data"""
+    return {
+        "records_per_sec": records_per_sec,
+        "mb_per_sec": mb_per_sec
+    }
+
+
+def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
+    """Helper method to ensure uniform representation of latency data"""
+    return {
+        "latency_50th_ms": latency_50th_ms,
+        "latency_99th_ms": latency_99th_ms,
+        "latency_999th_ms": latency_999th_ms
+    }
+
+
+def compute_aggregate_throughput(perf):
+    """Helper method for computing throughput after running a performance service."""
+    aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
+    aggregate_mbps = sum([r['mbps'] for r in perf.results])
+
+    return throughput(aggregate_rate, aggregate_mbps)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 7cbc7bb..f4887ed 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -13,26 +13,56 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.utils.util import wait_until
+
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.performance import PerformanceService
-import itertools
 from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
+from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
+
+import os
+import subprocess
+
 
 class ProducerPerformanceService(JmxMixin, PerformanceService):
 
-    logs = {
-        "producer_performance_log": {
-            "path": "/mnt/producer-performance.log",
-            "collect_default": True},
-    }
+    PERSISTENT_ROOT = "/mnt/producer_performance"
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stderr")
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "producer_performance.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
 
-    def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={},
+    def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=TRUNK, settings={},
                  intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=[]):
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
         PerformanceService.__init__(self, context, num_nodes)
+
+        self.logs = {
+            "producer_performance_stdout": {
+                "path": ProducerPerformanceService.STDOUT_CAPTURE,
+                "collect_default": True},
+            "producer_performance_stderr": {
+                "path": ProducerPerformanceService.STDERR_CAPTURE,
+                "collect_default": True},
+            "producer_performance_log": {
+                "path": ProducerPerformanceService.LOG_FILE,
+                "collect_default": True},
+            "jmx_log": {
+                "path": "/mnt/jmx_tool.log",
+                "collect_default": jmx_object_names is not None
+            }
+
+        }
+
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
+
+        security_protocol = self.security_config.security_protocol
+        assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
+            "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
+
         self.args = {
             'topic': topic,
             'kafka_opts': self.security_config.kafka_opts,
@@ -44,7 +74,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
         self.intermediate_stats = intermediate_stats
         self.client_id = client_id
 
-    def _worker(self, idx, node):
+        for node in self.nodes:
+            node.version = version
+
+    def start_cmd(self, node):
         args = self.args.copy()
         args.update({
             'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
@@ -52,48 +85,92 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             'client_id': self.client_id,
             'kafka_directory': kafka_dir(node)
             })
-        cmd = "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
+
+        cmd = ""
+
+        if node.version < TRUNK:
+            # In order to ensure more consistent configuration between versions, always use the ProducerPerformance
+            # tool from trunk
+            cmd += "for file in /opt/%s/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+            cmd += "for file in /opt/%s/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+            cmd += "export CLASSPATH; "
+
+        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % ProducerPerformanceService.LOG4J_CONFIG
+        cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
               "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
 
         self.security_config.setup_node(node)
         if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
             self.settings.update(self.security_config.properties)
+
         for key, value in self.settings.items():
             cmd += " %s=%s" % (str(key), str(value))
-        cmd += " | tee /mnt/producer-performance.log"
 
+        cmd += " 2>>%s | tee %s" % (ProducerPerformanceService.STDERR_CAPTURE, ProducerPerformanceService.STDOUT_CAPTURE)
+        return cmd
+
+    def pids(self, node):
+        try:
+            cmd = "jps | grep -i ProducerPerformance | awk '{print $1}'"
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            return pid_arr
+        except (subprocess.CalledProcessError, ValueError) as e:
+            return []
+
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
+    def _worker(self, idx, node):
+
+        node.account.ssh("mkdir -p %s" % ProducerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
+
+        # Create and upload log properties
+        log_config = self.render('tools_log4j.properties', log_file=ProducerPerformanceService.LOG_FILE)
+        node.account.create_file(ProducerPerformanceService.LOG4J_CONFIG, log_config)
+
+        cmd = self.start_cmd(node)
         self.logger.debug("Producer performance %d command: %s", idx, cmd)
 
-        def parse_stats(line):
-            parts = line.split(',')
-            return {
-                'records': int(parts[0].split()[0]),
-                'records_per_sec': float(parts[1].split()[0]),
-                'mbps': float(parts[1].split('(')[1].split()[0]),
-                'latency_avg_ms': float(parts[2].split()[0]),
-                'latency_max_ms': float(parts[3].split()[0]),
-                'latency_50th_ms': float(parts[4].split()[0]),
-                'latency_95th_ms': float(parts[5].split()[0]),
-                'latency_99th_ms': float(parts[6].split()[0]),
-                'latency_999th_ms': float(parts[7].split()[0]),
-            }
-        last = None
+        # start ProducerPerformance process
         producer_output = node.account.ssh_capture(cmd)
+        wait_until(lambda: self.alive(node), timeout_sec=20, err_msg="ProducerPerformance failed to start")
+        # block until there is at least one line of output
         first_line = next(producer_output, None)
+        if first_line is None:
+            raise Exception("No output from ProducerPerformance")
+
+        self.start_jmx_tool(idx, node)
+        wait_until(lambda: not self.alive(node), timeout_sec=1200, err_msg="ProducerPerformance failed to finish")
+        self.read_jmx_output(idx, node)
+
+        # parse producer output from file
+        last = None
+        producer_output = node.account.ssh_capture("cat %s" % ProducerPerformanceService.STDOUT_CAPTURE)
+        for line in producer_output:
+            if self.intermediate_stats:
+                try:
+                    self.stats[idx-1].append(self.parse_stats(line))
+                except:
+                    # Sometimes there are extraneous log messages
+                    pass
 
-        if first_line is not None:
-            self.start_jmx_tool(idx, node)
-            for line in itertools.chain([first_line], producer_output):
-                if self.intermediate_stats:
-                    try:
-                        self.stats[idx-1].append(parse_stats(line))
-                    except:
-                        # Sometimes there are extraneous log messages
-                        pass
-
-                last = line
-            try:
-                self.results[idx-1] = parse_stats(last)
-            except:
-                raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
-            self.read_jmx_output(idx, node)
+            last = line
+        try:
+            self.results[idx-1] = self.parse_stats(last)
+        except:
+            raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
+
+    def parse_stats(self, line):
+
+        parts = line.split(',')
+        return {
+            'records': int(parts[0].split()[0]),
+            'records_per_sec': float(parts[1].split()[0]),
+            'mbps': float(parts[1].split('(')[1].split()[0]),
+            'latency_avg_ms': float(parts[2].split()[0]),
+            'latency_max_ms': float(parts[3].split()[0]),
+            'latency_50th_ms': float(parts[4].split()[0]),
+            'latency_95th_ms': float(parts[5].split()[0]),
+            'latency_99th_ms': float(parts[6].split()[0]),
+            'latency_999th_ms': float(parts[7].split()[0]),
+        }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/templates/tools_log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties
index 6fec1d6..55ae4e0 100644
--- a/tests/kafkatest/services/templates/tools_log4j.properties
+++ b/tests/kafkatest/services/templates/tools_log4j.properties
@@ -22,4 +22,4 @@ log4j.appender.FILE.ImmediateFlush=true
 # Set the append to true
 log4j.appender.FILE.Append=true
 log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
\ No newline at end of file
+log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index d271f87..da7737c 100644
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -63,7 +63,9 @@ get_kafka() {
 }
 
 get_kafka 0.8.2.2
+chmod a+rw /opt/kafka-0.8.2.2
 get_kafka 0.9.0.1
+chmod a+rw /opt/kafka-0.9.0.1
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local
 # VMs, we can just create it if it doesn't exist and use it like we'd use