You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/27 23:18:32 UTC

[1/2] kafka git commit: KAFKA-1888: rolling upgrade test

Repository: kafka
Updated Branches:
  refs/heads/trunk af42c3789 -> e6b343302


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/jmx_mixin.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/jmx_mixin.py b/tests/kafkatest/services/performance/jmx_mixin.py
deleted file mode 100644
index 7e19839..0000000
--- a/tests/kafkatest/services/performance/jmx_mixin.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-class JmxMixin(object):
-
-    def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]):
-        self.jmx_object_names = jmx_object_names
-        self.jmx_attributes = jmx_attributes
-        self.jmx_port = 9192
-
-        self.started = [False] * num_nodes
-        self.jmx_stats = [{} for x in range(num_nodes)]
-        self.maximum_jmx_value = {}  # map from object_attribute_name to maximum value observed over time
-        self.average_jmx_value = {}  # map from object_attribute_name to average value observed over time
-
-    def clean_node(self, node):
-        node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
-
-    def start_jmx_tool(self, idx, node):
-        if self.started[idx-1] == True or self.jmx_object_names == None:
-            return
-        self.started[idx-1] = True
-
-        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool " \
-              "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
-        for jmx_object_name in self.jmx_object_names:
-            cmd += " --object-name %s" % jmx_object_name
-        for jmx_attribute in self.jmx_attributes:
-            cmd += " --attributes %s" % jmx_attribute
-        cmd += " | tee -a /mnt/jmx_tool.log"
-
-        self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
-        jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
-        jmx_output.next()
-
-    def read_jmx_output(self, idx, node):
-        if self.started[idx-1] == False:
-            return
-        self.maximum_jmx_value = {}
-        self.average_jmx_value = {}
-        object_attribute_names = []
-
-        cmd = "cat /mnt/jmx_tool.log"
-        self.logger.debug("Read jmx output %d command: %s", idx, cmd)
-        for line in node.account.ssh_capture(cmd, allow_fail=False):
-            if "time" in line:
-                object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
-                continue
-            stats = [float(field) for field in line.split(',')]
-            time_sec = int(stats[0]/1000)
-            self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
-
-        # do not calculate average and maximum of jmx stats until we have read output from all nodes
-        if any(len(time_to_stats)==0 for time_to_stats in self.jmx_stats):
-            return
-
-        start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
-        end_time_sec = max([max(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
-
-        for name in object_attribute_names:
-            aggregates_per_time = []
-            for time_sec in xrange(start_time_sec, end_time_sec+1):
-                # assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth
-                values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats]
-                # assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth
-                aggregates_per_time.append(sum(values_per_node))
-            self.average_jmx_value[name] = sum(aggregates_per_time)/len(aggregates_per_time)
-            self.maximum_jmx_value[name] = max(aggregates_per_time)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 25911af..401d6f7 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -13,10 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.services.performance.jmx_mixin import JmxMixin
+from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.performance import PerformanceService
 import itertools
 from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.kafka.directory import kafka_dir
 
 class ProducerPerformanceService(JmxMixin, PerformanceService):
 
@@ -45,8 +46,13 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
 
     def _worker(self, idx, node):
         args = self.args.copy()
-        args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 'jmx_port': self.jmx_port, 'client_id': self.client_id})
-        cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
+        args.update({
+            'bootstrap_servers': self.kafka.bootstrap_servers(),
+            'jmx_port': self.jmx_port,
+            'client_id': self.client_id,
+            'kafka_directory': kafka_dir(node)
+            })
+        cmd = "JMX_PORT=%(jmx_port)d /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
               "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
 
         self.security_config.setup_node(node)
@@ -73,19 +79,21 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             }
         last = None
         producer_output = node.account.ssh_capture(cmd)
-        first_line = producer_output.next()
-        self.start_jmx_tool(idx, node)
-        for line in itertools.chain([first_line], producer_output):
-            if self.intermediate_stats:
-                try:
-                    self.stats[idx-1].append(parse_stats(line))
-                except:
-                    # Sometimes there are extraneous log messages
-                    pass
+        first_line = next(producer_output, None)
 
-            last = line
-        try:
-            self.results[idx-1] = parse_stats(last)
-        except:
-            raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
-        self.read_jmx_output(idx, node)
+        if first_line is not None:
+            self.start_jmx_tool(idx, node)
+            for line in itertools.chain([first_line], producer_output):
+                if self.intermediate_stats:
+                    try:
+                        self.stats[idx-1].append(parse_stats(line))
+                    except:
+                        # Sometimes there are extraneous log messages
+                        pass
+
+                last = line
+            try:
+                self.results[idx-1] = parse_stats(last)
+            except:
+                raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
+            self.read_jmx_output(idx, node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties
deleted file mode 100644
index a7f6604..0000000
--- a/tests/kafkatest/services/templates/kafka.properties
+++ /dev/null
@@ -1,74 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-
-broker.id={{ broker_id }}
-port=9092
-#host.name=localhost
-advertised.host.name={{ node.account.hostname }}
-#advertised.port=<port accessible by clients>
-{% if security_protocol == interbroker_security_protocol %}
-listeners={{ security_protocol }}://:{{ port }}
-advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }}
-{% else %}
-listeners=PLAINTEXT://:9092,SSL://:9093
-advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093
-{% endif %}
-num.network.threads=3
-num.io.threads=8
-socket.send.buffer.bytes=102400
-socket.receive.buffer.bytes=65536
-socket.request.max.bytes=104857600
-
-log.dirs=/mnt/kafka-logs
-num.partitions=1
-num.recovery.threads.per.data.dir=1
-#log.flush.interval.messages=10000
-#log.flush.interval.ms=1000
-log.retention.hours=168
-#log.retention.bytes=1073741824
-log.segment.bytes=1073741824
-log.retention.check.interval.ms=300000
-log.cleaner.enable=false
-
-zookeeper.connect={{ zk.connect_setting() }}
-zookeeper.connection.timeout.ms=2000
-
-{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default is not none %}
-quota.producer.default={{ quota_config.quota_producer_default }}
-{% endif %}
-
-{% if quota_config.quota_consumer_default is defined and quota_config.quota_consumer_default is not none %}
-quota.consumer.default={{ quota_config.quota_consumer_default }}
-{% endif %}
-
-{% if quota_config.quota_producer_bytes_per_second_overrides is defined and quota_config.quota_producer_bytes_per_second_overrides is not none %}
-quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_per_second_overrides }}
-{% endif %}
-
-{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and quota_config.quota_consumer_bytes_per_second_overrides is not none %}
-quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides }}
-{% endif %}
-
-security.inter.broker.protocol={{ interbroker_security_protocol }}
-ssl.keystore.location=/mnt/ssl/test.keystore.jks
-ssl.keystore.password=test-ks-passwd
-ssl.key.password=test-key-passwd
-ssl.keystore.type=JKS
-ssl.truststore.location=/mnt/ssl/test.truststore.jks
-ssl.truststore.password=test-ts-passwd
-ssl.truststore.type=JKS
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 7ae7988..a95a0d6 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -14,28 +14,49 @@
 # limitations under the License.
 
 from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
+from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
 from kafkatest.utils.security_config import SecurityConfig
 
 import json
+import os
+import subprocess
+import time
 
 
 class VerifiableProducer(BackgroundThreadService):
+    PERSISTENT_ROOT = "/mnt/verifiable_producer"
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr")
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "verifiable_producer.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.properties")
 
-    CONFIG_FILE = "/mnt/verifiable_producer.properties"
     logs = {
-        "producer_log": {
-            "path": "/mnt/producer.log",
-            "collect_default": False}
-    }
-
-    def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max_messages=-1, throughput=100000):
+        "verifiable_producer_stdout": {
+            "path": STDOUT_CAPTURE,
+            "collect_default": False},
+        "verifiable_producer_stderr": {
+            "path": STDERR_CAPTURE,
+            "collect_default": False},
+        "verifiable_producer_log": {
+            "path": LOG_FILE,
+            "collect_default": True}
+        }
+
+    def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, max_messages=-1, throughput=100000, version=TRUNK):
         super(VerifiableProducer, self).__init__(context, num_nodes)
+        self.log_level = "TRACE"
 
         self.kafka = kafka
         self.topic = topic
         self.max_messages = max_messages
         self.throughput = throughput
 
+        for node in self.nodes:
+            node.version = version
         self.acked_values = []
         self.not_acked_values = []
 
@@ -45,15 +66,24 @@ class VerifiableProducer(BackgroundThreadService):
         self.prop_file += str(self.security_config)
 
     def _worker(self, idx, node):
+        node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False)
+
+        # Create and upload log properties
+        log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
+        node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
+
         # Create and upload config file
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(self.prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, self.prop_file)
         self.security_config.setup_node(node)
 
-        cmd = self.start_cmd
+        cmd = self.start_cmd(node)
         self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
 
+
+        last_produced_time = time.time()
+        prev_msg = None
         for line in node.account.ssh_capture(cmd):
             line = line.strip()
 
@@ -68,9 +98,30 @@ class VerifiableProducer(BackgroundThreadService):
                     elif data["name"] == "producer_send_success":
                         self.acked_values.append(int(data["value"]))
 
-    @property
-    def start_cmd(self):
-        cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \
+                        # Log information if there is a large gap between successively acknowledged messages
+                        t = time.time()
+                        time_delta_sec = t - last_produced_time
+                        if time_delta_sec > 2 and prev_msg is not None:
+                            self.logger.debug(
+                                "Time delta between successively acked messages is large: " +
+                                "delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data)))
+
+                        last_produced_time = t
+                        prev_msg = data
+
+    def start_cmd(self, node):
+
+        cmd = ""
+        if node.version <= LATEST_0_8_2:
+            # 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add
+            # the tools jar from trunk to the classpath
+            cmd += "for file in /opt/%s/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+            cmd += "for file in /opt/%s/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+            cmd += "export CLASSPATH; "
+
+        cmd += "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
+        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
+        cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \
               " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
@@ -78,9 +129,20 @@ class VerifiableProducer(BackgroundThreadService):
             cmd += " --throughput %s" % str(self.throughput)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
-        cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &"
+        cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
         return cmd
 
+    def pids(self, node):
+        try:
+            cmd = "ps ax | grep -i VerifiableProducer | grep java | grep -v grep | awk '{print $1}'"
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            return pid_arr
+        except (subprocess.CalledProcessError, ValueError) as e:
+            return []
+
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
     @property
     def acked(self):
         with self.lock:
@@ -113,7 +175,7 @@ class VerifiableProducer(BackgroundThreadService):
 
     def clean_node(self, node):
         node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
-        node.account.ssh("rm -rf /mnt/producer.log /mnt/verifiable_producer.properties", allow_fail=False)
+        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
         self.security_config.clean_node(node)
 
     def try_parse_json(self, string):

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index 09bec35..9a9047c 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -16,6 +16,8 @@
 
 from ducktape.services.service import Service
 
+from kafkatest.services.kafka.directory import kafka_dir
+
 import subprocess
 import time
 
@@ -46,9 +48,9 @@ class ZookeeperService(Service):
         self.logger.info(config_file)
         node.account.create_file("/mnt/zookeeper.properties", config_file)
 
-        node.account.ssh(
-            "/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &"
-            % self.logs["zk_log"])
+        start_cmd = "/opt/%s/bin/zookeeper-server-start.sh " % kafka_dir(node)
+        start_cmd += "/mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" % self.logs["zk_log"]
+        node.account.ssh(start_cmd)
 
         time.sleep(5)  # give it some time to start
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
new file mode 100644
index 0000000..aa2fe53
--- /dev/null
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+
+
+class ProduceConsumeValidateTest(Test):
+    """This class provides a shared template for tests which follow the common pattern of:
+
+        - produce to a topic in the background
+        - consume from that topic in the background
+        - run some logic, e.g. fail topic leader etc.
+        - perform validation
+    """
+
+    def __init__(self, test_context):
+        super(ProduceConsumeValidateTest, self).__init__(test_context=test_context)
+
+    def setup_producer_and_consumer(self):
+        raise NotImplementedError("Subclasses should implement this")
+
+    def start_producer_and_consumer(self):
+        # Start background producer and consumer
+        self.producer.start()
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10,
+             err_msg="Producer failed to start in a reasonable amount of time.")
+        self.consumer.start()
+        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=10,
+             err_msg="Consumer failed to start in a reasonable amount of time.")
+
+    def stop_producer_and_consumer(self):
+        for node in self.consumer.nodes:
+            if not self.consumer.alive(node):
+                self.logger.warn("Consumer on %s is not alive and probably should be." % str(node.account))
+        for node in self.producer.nodes:
+            if not self.producer.alive(node):
+                self.logger.warn("Producer on %s is not alive and probably should be." % str(node.account))
+
+        # Check that producer is still successfully producing
+        currently_acked = self.producer.num_acked
+        wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=10,
+             err_msg="Expected producer to still be producing.")
+
+        self.producer.stop()
+        self.consumer.wait()
+
+    def run_produce_consume_validate(self, core_test_action):
+        """Top-level template for simple produce/consume/validate tests."""
+
+        self.start_producer_and_consumer()
+        core_test_action()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+    def validate(self):
+        """Check that each acked message was consumed."""
+
+        self.acked = self.producer.acked
+        self.not_acked = self.producer.not_acked
+
+        # Check produced vs consumed
+        self.consumed = self.consumer.messages_consumed[1]
+        self.logger.info("num consumed:  %d" % len(self.consumed))
+
+        success = True
+        msg = ""
+
+        if len(set(self.consumed)) != len(self.consumed):
+            # There are duplicates. This is ok, so report it but don't fail the test
+            msg += "There are duplicate messages in the log\n"
+
+        if not set(self.consumed).issuperset(set(self.acked)):
+            # Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
+            acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
+            success = False
+
+            msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: "
+            if len(acked_minus_consumed) < 20:
+                msg += str(acked_minus_consumed)
+            else:
+                for i in range(20):
+                    msg += str(acked_minus_consumed.pop()) + ", "
+                msg += "...plus " + str(len(acked_minus_consumed) - 20) + " more"
+
+        if not success:
+            # Collect all the data logs if there was a failure
+            self.mark_for_collect(self.kafka)
+
+        if not success:
+            self.mark_for_collect(self.producer)
+
+        assert success, msg
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/tests/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/quota_test.py b/tests/kafkatest/tests/quota_test.py
index 4ae2e08..6ba6aa7 100644
--- a/tests/kafkatest/tests/quota_test.py
+++ b/tests/kafkatest/tests/quota_test.py
@@ -14,18 +14,13 @@
 # limitations under the License.
 
 from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
 from ducktape.mark import parametrize
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.console_consumer import ConsoleConsumer
 
-import random
-import signal
-import time
 
 class QuotaTest(Test):
     """
@@ -73,7 +68,10 @@ class QuotaTest(Test):
         """Override this since we're adding services outside of the constructor"""
         return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
-    def run_clients(self, producer_id, producer_num, consumer_id, consumer_num):
+    @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
+    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
+    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
+    def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
         # Produce all messages
         producer = ProducerPerformanceService(
             self.test_context, producer_num, self.kafka, security_protocol=self.security_protocol,
@@ -91,7 +89,7 @@ class QuotaTest(Test):
         consumer.run()
 
         for idx, messages in consumer.messages_consumed.iteritems():
-            assert len(messages)>0, "consumer %d didn't consume any message before timeout" % idx
+            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
 
         success, msg = self.validate(self.kafka, producer, consumer)
         assert success, msg
@@ -172,9 +170,3 @@ class QuotaTest(Test):
         if client_id in overridden_quotas:
             return float(overridden_quotas[client_id])
         return self.quota_config['quota_consumer_default']
-
-    @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
-    def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
-        self.run_clients(producer_id, producer_num, consumer_id, consumer_num)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index d20cc22..16aa944 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -13,24 +13,76 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
+
 from ducktape.mark import matrix
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 
 import signal
-import time
 
 
-class ReplicationTest(Test):
-    """Replication tests.
-    These tests verify that replication provides simple durability guarantees by checking that data acked by
-    brokers is still available for consumption in the face of various failure scenarios."""
+def clean_shutdown(test):
+    """Discover leader node for our topic and shut it down cleanly."""
+    test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGTERM)
+
+
+def hard_shutdown(test):
+    """Discover leader node for our topic and shut it down with a hard kill."""
+    test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGKILL)
+
+
+def clean_bounce(test):
+    """Chase the leader of one partition and restart it cleanly."""
+    for i in range(5):
+        prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
+        test.kafka.restart_node(prev_leader_node, clean_shutdown=True)
+
+
+def hard_bounce(test):
+    """Chase the leader and restart it with a hard kill."""
+    for i in range(5):
+        prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
+        test.kafka.signal_node(prev_leader_node, sig=signal.SIGKILL)
+
+        # Since this is a hard kill, we need to make sure the process is down and that
+        # zookeeper and the broker cluster have registered the loss of the leader.
+        # Waiting for a new leader to be elected on the topic-partition is a reasonable heuristic for this.
+
+        def leader_changed():
+            current_leader = test.kafka.leader(topic=test.topic, partition=0)
+            return current_leader is not None and current_leader != prev_leader_node
+
+        wait_until(lambda: len(test.kafka.pids(prev_leader_node)) == 0, timeout_sec=5)
+        wait_until(leader_changed, timeout_sec=10, backoff_sec=.5)
+        test.kafka.start_node(prev_leader_node)
+
+failures = {
+    "clean_shutdown": clean_shutdown,
+    "hard_shutdown": hard_shutdown,
+    "clean_bounce": clean_bounce,
+    "hard_bounce": hard_bounce
+}
+
+
+class ReplicationTest(ProduceConsumeValidateTest):
+    """
+    Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
+    (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
+    too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
+    ordering guarantees.
+
+    Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
+    we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
+
+    Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
+    consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
+    indicator that nothing is left to consume.
+    """
 
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
@@ -38,6 +90,11 @@ class ReplicationTest(Test):
 
         self.topic = "test_topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    "min.insync.replicas": 2}
+                                                                })
         self.producer_throughput = 10000
         self.num_producers = 1
         self.num_consumers = 1
@@ -49,125 +106,27 @@ class ReplicationTest(Test):
         """Override this since we're adding services outside of the constructor"""
         return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
-    def run_with_failure(self, failure, interbroker_security_protocol):
-        """This is the top-level test template.
-
-        The steps are:
-            Produce messages in the background while driving some failure condition
-            When done driving failures, immediately stop producing
-            Consume all messages
-            Validate that messages acked by brokers were consumed
 
-        Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
-        (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
-        too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
-        ordering guarantees.
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+            interbroker_security_protocol=["PLAINTEXT", "SSL"])
+    def test_replication_with_broker_failure(self, failure_mode, interbroker_security_protocol="PLAINTEXT"):
+        """Replication tests.
+        These tests verify that replication provides simple durability guarantees by checking that data acked by
+        brokers is still available for consumption in the face of various failure scenarios.
 
-        Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
-        we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
-
-        Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
-        consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
-        indicator that nothing is left to consume.
+        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
 
+            - Produce messages in the background
+            - Consume messages in the background
+            - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
+            - When done driving failures, stop producing, and finish consuming
+            - Validate that every acked message was consumed
         """
-        security_protocol='PLAINTEXT'
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, 
-                                  security_protocol=security_protocol,
-                                  interbroker_security_protocol=interbroker_security_protocol,
-                                  topics={self.topic: {
-                                               "partitions": 3,
-                                               "replication-factor": 3,
-                                               "min.insync.replicas": 2}
-                                         })
+        client_security_protocol = 'PLAINTEXT'
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=client_security_protocol, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=client_security_protocol, consumer_timeout_ms=60000, message_validator=is_int)
+
+        self.kafka.interbroker_security_protocol = interbroker_security_protocol
         self.kafka.start()
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=security_protocol, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=security_protocol, new_consumer=False, consumer_timeout_ms=3000, message_validator=is_int)
-
-        # Produce in a background thread while driving broker failures
-        self.producer.start()
-        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
-             err_msg="Producer failed to start in a reasonable amount of time.")
-        failure()
-        self.producer.stop()
-
-        self.acked = self.producer.acked
-        self.not_acked = self.producer.not_acked
-        self.logger.info("num not acked: %d" % self.producer.num_not_acked)
-        self.logger.info("num acked:     %d" % self.producer.num_acked)
-
-        # Consume all messages
-        self.consumer.start()
-        self.consumer.wait()
-        self.consumed = self.consumer.messages_consumed[1]
-        self.logger.info("num consumed:  %d" % len(self.consumed))
-
-        # Check produced vs consumed
-        success, msg = self.validate()
-
-        if not success:
-            self.mark_for_collect(self.producer)
-
-        assert success, msg
-
-    def clean_shutdown(self):
-        """Discover leader node for our topic and shut it down cleanly."""
-        self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGTERM)
-
-    def hard_shutdown(self):
-        """Discover leader node for our topic and shut it down with a hard kill."""
-        self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGKILL)
-
-    def clean_bounce(self):
-        """Chase the leader of one partition and restart it cleanly."""
-        for i in range(5):
-            prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
-            self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=True)
-
-    def hard_bounce(self):
-        """Chase the leader and restart it cleanly."""
-        for i in range(5):
-            prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
-            self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=False)
-
-            # Wait long enough for previous leader to probably be awake again
-            time.sleep(6)
-
-    def validate(self):
-        """Check that produced messages were consumed."""
-
-        success = True
-        msg = ""
-
-        if len(set(self.consumed)) != len(self.consumed):
-            # There are duplicates. This is ok, so report it but don't fail the test
-            msg += "There are duplicate messages in the log\n"
-
-        if not set(self.consumed).issuperset(set(self.acked)):
-            # Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
-            acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
-            success = False
-            msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + str(acked_minus_consumed)
-
-        if not success:
-            # Collect all the data logs if there was a failure
-            self.mark_for_collect(self.kafka)
-
-        return success, msg
-
-    
-    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_clean_shutdown(self, interbroker_security_protocol):
-        self.run_with_failure(self.clean_shutdown, interbroker_security_protocol)
-
-    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_hard_shutdown(self, interbroker_security_protocol):
-        self.run_with_failure(self.hard_shutdown, interbroker_security_protocol)
-
-    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_clean_bounce(self, interbroker_security_protocol):
-        self.run_with_failure(self.clean_bounce, interbroker_security_protocol)
-
-    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_hard_bounce(self, interbroker_security_protocol):
-        self.run_with_failure(self.hard_bounce, interbroker_security_protocol)
+        
+        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
new file mode 100644
index 0000000..97605cd
--- /dev/null
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.kafka import config_property
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+
+
+class TestUpgrade(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(TestUpgrade, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    "min.insync.replicas": 2}})
+        self.zk.start()
+        self.kafka.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.producer = VerifiableProducer(
+            self.test_context, self.num_producers, self.kafka, self.topic,
+            throughput=self.producer_throughput, version=LATEST_0_8_2)
+
+        # TODO - reduce the timeout
+        self.consumer = ConsoleConsumer(
+            self.test_context, self.num_consumers, self.kafka, self.topic,
+            consumer_timeout_ms=30000, message_validator=is_int, version=LATEST_0_8_2)
+
+    def perform_upgrade(self):
+        self.logger.info("First pass bounce - rolling upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = TRUNK
+            node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
+            self.kafka.start_node(node)
+
+        self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
+            self.kafka.start_node(node)
+
+    def test_upgrade(self):
+        """Test upgrade of Kafka broker cluster from 0.8.2 to 0.9.0
+
+        - Start 3 node broker cluster on version 0.8.2
+        - Start producer and consumer in the background
+        - Perform two-phase rolling upgrade
+            - First phase: upgrade brokers to 0.9.0 with inter.broker.protocol.version set to 0.8.2.X
+            - Second phase: remove inter.broker.protocol.version config with rolling bounce
+        - Finally, validate that every message acked by the producer was consumed by the consumer
+        """
+
+        self.run_produce_consume_validate(core_test_action=self.perform_upgrade)
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py
index cff6d2b..46c71f0 100644
--- a/tests/kafkatest/utils/__init__.py
+++ b/tests/kafkatest/utils/__init__.py
@@ -12,4 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
\ No newline at end of file
+# see kafka.server.KafkaConfig for additional details and defaults
+
+from util import kafkatest_version, is_version
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/utils/util.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
new file mode 100644
index 0000000..2b1e49c
--- /dev/null
+++ b/tests/kafkatest/utils/util.py
@@ -0,0 +1,42 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest import __version__ as __kafkatest_version__
+
+import re
+
+
+def kafkatest_version():
+    """Return string representation of current ducktape version."""
+    return __kafkatest_version__
+
+
+def _kafka_jar_versions(proc_string):
+    """Use a rough heuristic to find all kafka versions explicitly in the process classpath"""
+    versions = re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)
+    versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string))
+
+    return set(versions)
+
+
+def is_version(node, version_list, proc_grep_string="kafka"):
+    """Heuristic to check that only the specified version appears in the classpath of the process
+    A useful tool to aid in checking that service version apis are working correctly.
+    """
+    lines = [l for l in node.account.ssh_capture("ps ax | grep %s | grep -v grep" % proc_grep_string)]
+    assert len(lines) == 1
+
+    versions = _kafka_jar_versions(lines[0])
+    return versions == {str(v) for v in version_list}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
index d637eb8..f555fd3 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -14,15 +14,21 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
+import re
 from setuptools import find_packages, setup
 
+version = ''
+with open('kafkatest/__init__.py', 'r') as fd:
+    version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
+                        fd.read(), re.MULTILINE).group(1)
+
 setup(name="kafkatest",
-      version="0.9.0.dev0",
+      version=version,
       description="Apache Kafka System Tests",
       author="Apache Kafka",
       platforms=["any"], 
       license="apache2.0",
       packages=find_packages(),
       include_package_data=True,
-      install_requires=["ducktape==0.3.2"]
+      install_requires=["ducktape==0.3.8"]
       )

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index dd695cf..0cd90c0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -23,12 +23,14 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.utils.Utils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -138,7 +140,29 @@ public class VerifiableProducer {
 
         return parser;
     }
-
+    
+    /**
+     * Read a properties file from the given path
+     * @param filename The path of the file to read
+     *                 
+     * Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate 
+     * but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests, 
+     * we use VerifiableProducer from trunk tools package, and run it against 0.8.X.X kafka jars.
+     * Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
+     */
+    public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
+        Properties props = new Properties();
+        InputStream propStream = null;
+        try {
+            propStream = new FileInputStream(filename);
+            props.load(propStream);
+        } finally {
+            if (propStream != null)
+                propStream.close();
+        }
+        return props;
+    }
+    
     /** Construct a VerifiableProducer object from command-line arguments. */
     public static VerifiableProducer createFromArgs(String[] args) {
         ArgumentParser parser = argParser();
@@ -164,7 +188,7 @@ public class VerifiableProducer {
             producerProps.put("retries", "0");
             if (configFile != null) {
                 try {
-                    producerProps.putAll(Utils.loadProps(configFile));
+                    producerProps.putAll(loadProps(configFile));
                 } catch (IOException e) {
                     throw new ArgumentParserException(e.getMessage(), parser);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 133f10a..2c2e5c2 100644
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -38,9 +38,31 @@ if [ -z `which javac` ]; then
 fi
 
 chmod a+rw /opt
-if [ ! -e /opt/kafka ]; then
-    ln -s /vagrant /opt/kafka
+if [ -h /opt/kafka-trunk ]; then
+    # reset symlink
+    rm /opt/kafka-trunk
 fi
+ln -s /vagrant /opt/kafka-trunk
+
+get_kafka() {
+    version=$1
+
+    kafka_dir=/opt/kafka-$version
+    url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz
+    if [ ! -d /opt/kafka-$version ]; then
+        pushd /tmp
+        curl -O $url
+        file_tgz=`basename $url`
+        tar -xzf $file_tgz
+        rm -rf $file_tgz
+
+        file=`basename $file_tgz .tgz`
+        mv $file $kafka_dir
+        popd
+    fi
+}
+
+get_kafka 0.8.2.2
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local
 # VMs, we can just create it if it doesn't exist and use it like we'd use


[2/2] kafka git commit: KAFKA-1888: rolling upgrade test

Posted by gu...@apache.org.
KAFKA-1888: rolling upgrade test

ewencp gwenshap
This needs some refactoring to avoid the duplicated code between replication test and upgrade test, but in shape for initial feedback.

I'm interested in feedback on the added `KafkaConfig` class and `kafka_props` file. This addition makes it:
- easier to attach different configs to different nodes (e.g. during broker upgrade process)
- easier to reason about the configuration of a particular node

Notes:
- in the default values in the KafkaConfig class, I removed many properties which were in kafka.properties before. This is because most of those properties were set to what is already the default value.
- when running non-trunk VerifiableProducer, I append the trunk tools jar to the classpath, and run it with the non-trunk kafka-run-class.sh script

Author: Geoff Anderson <ge...@confluent.io>

Reviewers: Dong Lin, Ewen Cheslack-Postava

Closes #229 from granders/KAFKA-1888-upgrade-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6b34330
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6b34330
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6b34330

Branch: refs/heads/trunk
Commit: e6b343302f3208f7f6e0099fe2a7132ef9eaaafb
Parents: af42c37
Author: Geoff Anderson <ge...@confluent.io>
Authored: Tue Oct 27 15:23:47 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 27 15:23:47 2015 -0700

----------------------------------------------------------------------
 tests/kafkatest/__init__.py                     |  10 +
 .../sanity_checks/test_console_consumer.py      |  43 ++-
 .../sanity_checks/test_kafka_version.py         |  55 ++++
 .../sanity_checks/test_verifiable_producer.py   |  70 +++++
 tests/kafkatest/services/console_consumer.py    | 109 ++++---
 tests/kafkatest/services/copycat.py             |   9 +-
 tests/kafkatest/services/kafka.py               | 253 ----------------
 tests/kafkatest/services/kafka/__init__.py      |  16 +
 tests/kafkatest/services/kafka/config.py        |  53 ++++
 .../kafkatest/services/kafka/config_property.py | 177 +++++++++++
 tests/kafkatest/services/kafka/directory.py     |  32 ++
 tests/kafkatest/services/kafka/kafka.py         | 303 +++++++++++++++++++
 .../services/kafka/templates/kafka.properties   |  65 ++++
 tests/kafkatest/services/kafka/version.py       |  61 ++++
 .../kafkatest/services/kafka_log4j_appender.py  |  13 +-
 tests/kafkatest/services/mirror_maker.py        |   5 +-
 tests/kafkatest/services/monitor/__init__.py    |  14 +
 tests/kafkatest/services/monitor/jmx.py         |  90 ++++++
 .../performance/consumer_performance.py         |   8 +-
 .../services/performance/end_to_end_latency.py  |   8 +-
 .../kafkatest/services/performance/jmx_mixin.py |  81 -----
 .../performance/producer_performance.py         |  44 +--
 .../services/templates/kafka.properties         |  74 -----
 tests/kafkatest/services/verifiable_producer.py |  88 +++++-
 tests/kafkatest/services/zookeeper.py           |   8 +-
 .../kafkatest/tests/produce_consume_validate.py | 106 +++++++
 tests/kafkatest/tests/quota_test.py             |  20 +-
 tests/kafkatest/tests/replication_test.py       | 207 +++++--------
 tests/kafkatest/tests/upgrade_test.py           |  81 +++++
 tests/kafkatest/utils/__init__.py               |   4 +-
 tests/kafkatest/utils/util.py                   |  42 +++
 tests/setup.py                                  |  10 +-
 .../apache/kafka/tools/VerifiableProducer.java  |  30 +-
 vagrant/base.sh                                 |  26 +-
 34 files changed, 1561 insertions(+), 654 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index 28d269b..e346811 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -14,3 +14,13 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
+# This determines the version of kafkatest that can be published to PyPi and installed with pip
+#
+# Note that in development, this version name can't follow Kafka's convention of having a trailing "-SNAPSHOT"
+# due to python version naming restrictions, which are enforced by python packaging tools
+# (see  https://www.python.org/dev/peps/pep-0440/)
+#
+# Instead, in trunk, the version should have a suffix of the form ".devN"
+#
+# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
+__version__ = '0.9.0.0.dev0'

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index a9c4d53..0d2c1fd 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -20,11 +20,16 @@ from ducktape.mark import matrix
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_8_2
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils.remote_account import line_count, file_exists
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.utils.security_config import SecurityConfig
+
 
 import time
 
+
 class ConsoleConsumerTest(Test):
     """Sanity checks on console consumer service class."""
     def __init__(self, test_context):
@@ -32,24 +37,29 @@ class ConsoleConsumerTest(Test):
 
         self.topic = "topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)
 
     def setUp(self):
         self.zk.start()
 
-    @parametrize(security_protocol='SSL', new_consumer=True)
-    @matrix(security_protocol=['PLAINTEXT'], new_consumer=[False, True])
+    @parametrize(security_protocol=SecurityConfig.SSL, new_consumer=True)
+    @matrix(security_protocol=[SecurityConfig.PLAINTEXT], new_consumer=[False, True])
     def test_lifecycle(self, security_protocol, new_consumer):
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
-                                  security_protocol=security_protocol,
-                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+        """Check that console consumer starts/stops properly, and that we are capturing log output."""
+
+        self.kafka.security_protocol = security_protocol
         self.kafka.start()
 
+        self.consumer.security_protocol = security_protocol
+        self.consumer.new_consumer = new_consumer
+
         t0 = time.time()
-        self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, security_protocol=security_protocol, new_consumer=new_consumer)
         self.consumer.start()
         node = self.consumer.nodes[0]
 
-        wait_until(lambda: self.consumer.alive(node), 
+        wait_until(lambda: self.consumer.alive(node),
             timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
         self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
 
@@ -62,3 +72,22 @@ class ConsoleConsumerTest(Test):
         assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
 
         self.consumer.stop_node(node)
+
+    def test_version(self):
+        """Check that console consumer v0.8.2.X successfully starts and consumes messages."""
+        self.kafka.start()
+
+        num_messages = 1000
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
+                                           max_messages=num_messages, throughput=1000)
+        self.producer.start()
+        self.producer.wait()
+
+        self.consumer.nodes[0].version = LATEST_0_8_2
+        self.consumer.consumer_timeout_ms = 1000
+        self.consumer.start()
+        self.consumer.wait()
+
+        num_consumed = len(self.consumer.messages_consumed[1])
+        num_produced = self.producer.num_acked
+        assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/sanity_checks/test_kafka_version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py
new file mode 100644
index 0000000..f5f5d5f
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_kafka_version.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService, config_property
+from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
+from kafkatest.utils import is_version
+
+
+class KafkaVersionTest(Test):
+    """Sanity checks on kafka versioning."""
+    def __init__(self, test_context):
+        super(KafkaVersionTest, self).__init__(test_context)
+
+        self.topic = "topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+
+    def setUp(self):
+        self.zk.start()
+
+    def test_0_8_2(self):
+        """Test kafka service node-versioning api - verify that we can bring up a single-node 0.8.2.X cluster."""
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+        node = self.kafka.nodes[0]
+        node.version = LATEST_0_8_2
+        self.kafka.start()
+
+        assert is_version(node, [LATEST_0_8_2])
+
+    def test_multi_version(self):
+        """Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X,
+        the other on trunk."""
+        self.kafka = KafkaService(self.test_context, num_nodes=2, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 2}})
+        self.kafka.nodes[1].version = LATEST_0_8_2
+        self.kafka.nodes[1].config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
+        self.kafka.start()
+
+        assert is_version(self.kafka.nodes[0], [TRUNK.vstring])
+        assert is_version(self.kafka.nodes[1], [LATEST_0_8_2])

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
new file mode 100644
index 0000000..4155279
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.utils import is_version
+
+
+class TestVerifiableProducer(Test):
+    """Sanity checks on verifiable producer service class."""
+    def __init__(self, test_context):
+        super(TestVerifiableProducer, self).__init__(test_context)
+
+        self.topic = "topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+
+        self.num_messages = 100
+        # This will produce to source kafka cluster
+        self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
+                                           max_messages=self.num_messages, throughput=1000)
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    @parametrize(producer_version=str(LATEST_0_8_2))
+    @parametrize(producer_version=str(TRUNK))
+    def test_simple_run(self, producer_version=TRUNK):
+        """
+        Test that we can start VerifiableProducer on trunk or against the 0.8.2 jar, and
+        verify that we can produce a small number of messages.
+        """
+        node = self.producer.nodes[0]
+        node.version = KafkaVersion(producer_version)
+        self.producer.start()
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
+             err_msg="Producer failed to start in a reasonable amount of time.")
+
+        # using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring
+        # that this check works with TRUNK
+        # When running VerifiableProducer 0.8.X, both trunk version and 0.8.X should show up because of the way
+        # verifiable producer pulls in some trunk directories into its classpath
+        assert is_version(node, [node.version.vstring, TRUNK.vstring])
+
+        self.producer.wait()
+        num_produced = self.producer.num_acked
+        assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 96fe777..07343e8 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -13,15 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.services.background_thread import BackgroundThreadService
 from ducktape.utils.util import wait_until
-from kafkatest.services.performance.jmx_mixin import JmxMixin
-from kafkatest.services.performance import PerformanceService
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
+from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.utils.security_config import SecurityConfig
 
+import itertools
 import os
 import subprocess
-import itertools
+
 
 def is_int(msg):
     """Default method used to check whether text pulled from console consumer is a message.
@@ -30,7 +33,7 @@ def is_int(msg):
     """
     try:
         return int(msg)
-    except:
+    except ValueError:
         return None
 
 """
@@ -74,7 +77,7 @@ Option                                  Description
 """
 
 
-class ConsoleConsumer(JmxMixin, PerformanceService):
+class ConsoleConsumer(JmxMixin, BackgroundThreadService):
     # Root directory for persistent output
     PERSISTENT_ROOT = "/mnt/console_consumer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
@@ -94,10 +97,10 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
         "consumer_log": {
             "path": LOG_FILE,
             "collect_default": True}
-        }
+    }
 
-    def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None,
-                 from_beginning=True, consumer_timeout_ms=None, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
+    def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, new_consumer=False, message_validator=None,
+                 from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
         """
         Args:
             context:                    standard context
@@ -114,7 +117,7 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
                                         in a topic.
         """
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
-        PerformanceService.__init__(self, context, num_nodes)
+        BackgroundThreadService.__init__(self, context, num_nodes)
         self.kafka = kafka
         self.new_consumer = new_consumer
         self.args = {
@@ -122,47 +125,70 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
         }
 
         self.consumer_timeout_ms = consumer_timeout_ms
+        for node in self.nodes:
+            node.version = version
 
         self.from_beginning = from_beginning
         self.message_validator = message_validator
         self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
         self.client_id = client_id
+        self.security_protocol = security_protocol
+
+        # Validate a few configs
+        if self.new_consumer is None:
+            self.new_consumer = self.security_protocol == SecurityConfig.SSL
+        if self.security_protocol == SecurityConfig.SSL and not self.new_consumer:
+            raise Exception("SSL protocol is supported only with the new consumer")
 
+    def prop_file(self, node):
+        """Return a string which can be used to create a configuration file appropriate for the given node."""
         # Process client configuration
-        self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms, client_id=self.client_id)
+        prop_file = self.render('console_consumer.properties')
+        if hasattr(node, "version") and node.version <= LATEST_0_8_2:
+            # in 0.8.2.X and earlier, console consumer does not have --timeout-ms option
+            # instead, we have to pass it through the config file
+            prop_file += "\nconsumer.timeout.ms=%s\n" % str(self.consumer_timeout_ms)
 
         # Add security properties to the config. If security protocol is not specified,
         # use the default in the template properties.
-        self.security_config = SecurityConfig(security_protocol, self.prop_file)
+        self.security_config = SecurityConfig(self.security_protocol, prop_file)
         self.security_protocol = self.security_config.security_protocol
-        if self.new_consumer is None:
-            self.new_consumer = self.security_protocol == SecurityConfig.SSL
-        if self.security_protocol == SecurityConfig.SSL and not self.new_consumer:
-            raise Exception("SSL protocol is supported only with the new consumer")
-        self.prop_file += str(self.security_config)
 
-    @property
-    def start_cmd(self):
+        prop_file += str(self.security_config)
+        return prop_file
+
+    def start_cmd(self, node):
+        """Return the start command appropriate for the given node."""
         args = self.args.copy()
         args['zk_connect'] = self.kafka.zk.connect_setting()
         args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
+        args['log_dir'] = ConsoleConsumer.LOG_DIR
+        args['log4j_config'] = ConsoleConsumer.LOG4J_CONFIG
         args['config_file'] = ConsoleConsumer.CONFIG_FILE
+        args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['jmx_port'] = self.jmx_port
+        args['kafka_dir'] = kafka_dir(node)
+        args['broker_list'] = self.kafka.bootstrap_servers()
 
-        cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
-        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
-        cmd += " JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \
-            " --consumer.config %(config_file)s" % args
+        cmd = "export JMX_PORT=%(jmx_port)s; " \
+              "export LOG_DIR=%(log_dir)s; " \
+              "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
+              "/opt/%(kafka_dir)s/bin/kafka-console-consumer.sh " \
+              "--topic %(topic)s --consumer.config %(config_file)s" % args
 
         if self.new_consumer:
-            cmd += " --new-consumer --bootstrap-server %s"  % self.kafka.bootstrap_servers()
+            cmd += " --new-consumer --bootstrap-server %(broker_list)s" % args
         else:
             cmd += " --zookeeper %(zk_connect)s" % args
         if self.from_beginning:
             cmd += " --from-beginning"
+
         if self.consumer_timeout_ms is not None:
-            cmd += " --timeout-ms %s" % self.consumer_timeout_ms
+            # version 0.8.X and below do not support --timeout-ms option
+            # This will be added in the properties file instead
+            if node.version > LATEST_0_8_2:
+                cmd += " --timeout-ms %s" % self.consumer_timeout_ms
 
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
@@ -183,8 +209,10 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
 
         # Create and upload config file
         self.logger.info("console_consumer.properties:")
-        self.logger.info(self.prop_file)
-        node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file)
+
+        prop_file = self.prop_file(node)
+        self.logger.info(prop_file)
+        node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
         self.security_config.setup_node(node)
 
         # Create and upload log properties
@@ -192,23 +220,26 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
         node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
 
         # Run and capture output
-        cmd = self.start_cmd
+        cmd = self.start_cmd(node)
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
 
         consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
-        first_line = consumer_output.next()
-        self.start_jmx_tool(idx, node)
-        for line in itertools.chain([first_line], consumer_output):
-            msg = line.strip()
-            if self.message_validator is not None:
-                msg = self.message_validator(msg)
-            if msg is not None:
-                self.messages_consumed[idx].append(msg)
+        first_line = next(consumer_output, None)
+
+        if first_line is not None:
+            self.start_jmx_tool(idx, node)
+
+            for line in itertools.chain([first_line], consumer_output):
+                msg = line.strip()
+                if self.message_validator is not None:
+                    msg = self.message_validator(msg)
+                if msg is not None:
+                    self.messages_consumed[idx].append(msg)
 
-        self.read_jmx_output(idx, node)
+            self.read_jmx_output(idx, node)
 
     def start_node(self, node):
-        PerformanceService.start_node(self, node)
+        BackgroundThreadService.start_node(self, node)
 
     def stop_node(self, node):
         node.account.kill_process("console_consumer", allow_fail=True)
@@ -220,6 +251,6 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
             self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
                              (self.__class__.__name__, node.account))
         JmxMixin.clean_node(self, node)
-        PerformanceService.clean_node(self, node)
+        node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
         self.security_config.clean_node(node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/copycat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py
index 45ef330..831a932 100644
--- a/tests/kafkatest/services/copycat.py
+++ b/tests/kafkatest/services/copycat.py
@@ -15,8 +15,9 @@
 
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
-import subprocess, signal
 
+from kafkatest.services.kafka.directory import kafka_dir
+import signal
 
 class CopycatServiceBase(Service):
     """Base class for Copycat services providing some common settings and functionality"""
@@ -99,7 +100,7 @@ class CopycatStandaloneService(CopycatServiceBase):
 
         self.logger.info("Starting Copycat standalone process")
         with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            node.account.ssh("/opt/kafka/bin/copycat-standalone.sh /mnt/copycat.properties " +
+            node.account.ssh("/opt/%s/bin/copycat-standalone.sh /mnt/copycat.properties " % kafka_dir(node) +
                              " ".join(remote_connector_configs) +
                              " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid")
             monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup")
@@ -108,7 +109,6 @@ class CopycatStandaloneService(CopycatServiceBase):
             raise RuntimeError("No process ids recorded")
 
 
-
 class CopycatDistributedService(CopycatServiceBase):
     """Runs Copycat in distributed mode."""
 
@@ -128,7 +128,7 @@ class CopycatDistributedService(CopycatServiceBase):
 
         self.logger.info("Starting Copycat distributed process")
         with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            cmd = "/opt/kafka/bin/copycat-distributed.sh /mnt/copycat.properties "
+            cmd = "/opt/%s/bin/copycat-distributed.sh /mnt/copycat.properties " % kafka_dir(node)
             # Only submit connectors on the first node so they don't get submitted multiple times. Also only submit them
             # the first time the node is started so
             if self.first_start and node == self.nodes[0]:
@@ -140,4 +140,3 @@ class CopycatDistributedService(CopycatServiceBase):
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
 
-        self.first_start = False

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py
deleted file mode 100644
index 5c4b22f..0000000
--- a/tests/kafkatest/services/kafka.py
+++ /dev/null
@@ -1,253 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.services.service import Service
-from ducktape.utils.util import wait_until
-from kafkatest.services.performance.jmx_mixin import JmxMixin
-from kafkatest.utils.security_config import SecurityConfig
-import json
-import re
-import signal
-import time
-
-
-class KafkaService(JmxMixin, Service):
-
-    logs = {
-        "kafka_log": {
-            "path": "/mnt/kafka.log",
-            "collect_default": True},
-        "kafka_data": {
-            "path": "/mnt/kafka-logs",
-            "collect_default": False}
-    }
-
-    def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
-                 topics=None, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
-        """
-        :type context
-        :type zk: ZookeeperService
-        :type topics: dict
-        """
-        Service.__init__(self, context, num_nodes)
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
-        self.zk = zk
-        if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL:
-            self.security_config = SecurityConfig(SecurityConfig.SSL)
-        else:
-            self.security_config = SecurityConfig(SecurityConfig.PLAINTEXT)
-        self.security_protocol = security_protocol
-        self.interbroker_security_protocol = interbroker_security_protocol
-        self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093
-        self.topics = topics
-        self.quota_config = quota_config
-
-    def start(self):
-        Service.start(self)
-
-        # Create topics if necessary
-        if self.topics is not None:
-            for topic, topic_cfg in self.topics.items():
-                if topic_cfg is None:
-                    topic_cfg = {}
-
-                topic_cfg["topic"] = topic
-                self.create_topic(topic_cfg)
-
-    def start_node(self, node):
-        props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node),
-            port = self.port, security_protocol = self.security_protocol, quota_config=self.quota_config,
-            interbroker_security_protocol=self.interbroker_security_protocol)
-        self.logger.info("kafka.properties:")
-        self.logger.info(props_file)
-        node.account.create_file("/mnt/kafka.properties", props_file)
-        self.security_config.setup_node(node)
-
-        cmd = "JMX_PORT=%d /opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" % self.jmx_port
-        self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
-        with node.account.monitor_log("/mnt/kafka.log") as monitor:
-            node.account.ssh(cmd)
-            monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
-        self.start_jmx_tool(self.idx(node), node)
-        if len(self.pids(node)) == 0:
-            raise Exception("No process ids recorded on node %s" % str(node))
-
-    def pids(self, node):
-        """Return process ids associated with running processes on the given node."""
-        try:
-            return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)]
-        except:
-            return []
-
-    def signal_node(self, node, sig=signal.SIGTERM):
-        pids = self.pids(node)
-        for pid in pids:
-            node.account.signal(pid, sig)
-
-    def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
-        leader = self.leader(topic, partition)
-        self.signal_node(leader, sig)
-
-    def stop_node(self, node, clean_shutdown=True):
-        pids = self.pids(node)
-        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
-
-        for pid in pids:
-            node.account.signal(pid, sig, allow_fail=False)
-
-        node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
-
-    def clean_node(self, node):
-        JmxMixin.clean_node(self, node)
-        node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
-        self.security_config.clean_node(node)
-
-    def create_topic(self, topic_cfg):
-        node = self.nodes[0] # any node is fine here
-        self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
-
-        cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\
-            "--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
-                'zk_connect': self.zk.connect_setting(),
-                'topic': topic_cfg.get("topic"),
-                'partitions': topic_cfg.get('partitions', 1),
-                'replication': topic_cfg.get('replication-factor', 1)
-            }
-
-        if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None:
-            for config_name, config_value in topic_cfg["configs"].items():
-                cmd += " --config %s=%s" % (config_name, str(config_value))
-
-        self.logger.info("Running topic creation command...\n%s" % cmd)
-        node.account.ssh(cmd)
-
-        time.sleep(1)
-        self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
-        for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
-            self.logger.info(line)
-
-    def describe_topic(self, topic):
-        node = self.nodes[0]
-        cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
-              (self.zk.connect_setting(), topic)
-        output = ""
-        for line in node.account.ssh_capture(cmd):
-            output += line
-        return output
-
-    def verify_reassign_partitions(self, reassignment):
-        """Run the reassign partitions admin tool in "verify" mode
-        """
-        node = self.nodes[0]
-        json_file = "/tmp/" + str(time.time()) + "_reassign.json"
-
-        # reassignment to json
-        json_str = json.dumps(reassignment)
-        json_str = json.dumps(json_str)
-
-        # create command
-        cmd = "echo %s > %s && " % (json_str, json_file)
-        cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
-                "--zookeeper %(zk_connect)s "\
-                "--reassignment-json-file %(reassignment_file)s "\
-                "--verify" % {'zk_connect': self.zk.connect_setting(),
-                                'reassignment_file': json_file}
-        cmd += " && sleep 1 && rm -f %s" % json_file
-
-        # send command
-        self.logger.info("Verifying parition reassignment...")
-        self.logger.debug(cmd)
-        output = ""
-        for line in node.account.ssh_capture(cmd):
-            output += line
-
-        self.logger.debug(output)
-
-        if re.match(".*is in progress.*", output) is not None:
-            return False
-
-        return True
-
-    def execute_reassign_partitions(self, reassignment):
-        """Run the reassign partitions admin tool in "verify" mode
-        """
-        node = self.nodes[0]
-        json_file = "/tmp/" + str(time.time()) + "_reassign.json"
-
-        # reassignment to json
-        json_str = json.dumps(reassignment)
-        json_str = json.dumps(json_str)
-
-        # create command
-        cmd = "echo %s > %s && " % (json_str, json_file)
-        cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
-                "--zookeeper %(zk_connect)s "\
-                "--reassignment-json-file %(reassignment_file)s "\
-                "--execute" % {'zk_connect': self.zk.connect_setting(),
-                                'reassignment_file': json_file}
-        cmd += " && sleep 1 && rm -f %s" % json_file
-
-        # send command
-        self.logger.info("Executing parition reassignment...")
-        self.logger.debug(cmd)
-        output = ""
-        for line in node.account.ssh_capture(cmd):
-            output += line
-
-        self.logger.debug("Verify partition reassignment:")
-        self.logger.debug(output)
-
-    def restart_node(self, node, wait_sec=0, clean_shutdown=True):
-        """Restart the given node, waiting wait_sec in between stopping and starting up again."""
-        self.stop_node(node, clean_shutdown)
-        time.sleep(wait_sec)
-        self.start_node(node)
-
-    def leader(self, topic, partition=0):
-        """ Get the leader replica for the given topic and partition.
-        """
-        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \
-              % self.zk.connect_setting()
-        cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
-        self.logger.debug(cmd)
-
-        node = self.nodes[0]
-        self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
-        partition_state = None
-        for line in node.account.ssh_capture(cmd):
-            match = re.match("^({.+})$", line)
-            if match is not None:
-                partition_state = match.groups()[0]
-                break
-
-        if partition_state is None:
-            raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
-
-        partition_state = json.loads(partition_state)
-        self.logger.info(partition_state)
-
-        leader_idx = int(partition_state["leader"])
-        self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
-        return self.get_node(leader_idx)
-
-    def bootstrap_servers(self):
-        """Get the broker list to connect to Kafka using the specified security protocol
-        """
-        return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes])
-
-    def read_jmx_output_all_nodes(self):
-        for node in self.nodes:
-            self.read_jmx_output(self.idx(node), node)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/__init__.py b/tests/kafkatest/services/kafka/__init__.py
new file mode 100644
index 0000000..6408b59
--- /dev/null
+++ b/tests/kafkatest/services/kafka/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafka import KafkaService

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py
new file mode 100644
index 0000000..0accf20
--- /dev/null
+++ b/tests/kafkatest/services/kafka/config.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import config_property
+
+
+class KafkaConfig(dict):
+    """A dictionary-like container class which allows for definition of overridable default values,
+    which is also capable of "rendering" itself as a useable server.properties file.
+    """
+
+    DEFAULTS = {
+        config_property.PORT: 9092,
+        config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
+        config_property.LOG_DIRS: "/mnt/kafka-logs",
+        config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 2000
+    }
+
+    def __init__(self, **kwargs):
+        super(KafkaConfig, self).__init__(**kwargs)
+
+        # Set defaults
+        for key, val in self.DEFAULTS.items():
+            if not self.has_key(key):
+                self[key] = val
+
+    def render(self):
+        """Render self as a series of lines key=val\n, and do so in a consistent order. """
+        keys = [k for k in self.keys()]
+        keys.sort()
+
+        s = ""
+        for k in keys:
+            s += "%s=%s\n" % (k, str(self[k]))
+        return s
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
new file mode 100644
index 0000000..cc685aa
--- /dev/null
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Define Kafka configuration property names here.
+"""
+
+BROKER_ID = "broker.id"
+PORT = "port"
+ADVERTISED_HOSTNAME = "advertised.host.name"
+
+NUM_NETWORK_THREADS = "num.network.threads"
+NUM_IO_THREADS = "num.io.threads"
+SOCKET_SEND_BUFFER_BYTES = "socket.send.buffer.bytes"
+SOCKET_RECEIVE_BUFFER_BYTES = "socket.receive.buffer.bytes"
+SOCKET_REQUEST_MAX_BYTES = "socket.request.max.bytes"
+LOG_DIRS = "log.dirs"
+NUM_PARTITIONS = "num.partitions"
+NUM_RECOVERY_THREADS_PER_DATA_DIR = "num.recovery.threads.per.data.dir"
+
+LOG_RETENTION_HOURS = "log.retention.hours"
+LOG_SEGMENT_BYTES = "log.segment.bytes"
+LOG_RETENTION_CHECK_INTERVAL_MS = "log.retention.check.interval.ms"
+LOG_CLEANER_ENABLE = "log.cleaner.enable"
+
+AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"
+
+ZOOKEEPER_CONNECT = "zookeeper.connect"
+ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms"
+INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version"
+
+
+"""
+From KafkaConfig.scala
+
+  /** ********* General Configuration ***********/
+  val MaxReservedBrokerIdProp = "reserved.broker.max.id"
+  val MessageMaxBytesProp = "message.max.bytes"
+  val NumIoThreadsProp = "num.io.threads"
+  val BackgroundThreadsProp = "background.threads"
+  val QueuedMaxRequestsProp = "queued.max.requests"
+  /** ********* Socket Server Configuration ***********/
+  val PortProp = "port"
+  val HostNameProp = "host.name"
+  val ListenersProp = "listeners"
+  val AdvertisedPortProp = "advertised.port"
+  val AdvertisedListenersProp = "advertised.listeners"
+  val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
+  val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
+  val SocketRequestMaxBytesProp = "socket.request.max.bytes"
+  val MaxConnectionsPerIpProp = "max.connections.per.ip"
+  val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
+  val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
+  /** ********* Log Configuration ***********/
+  val NumPartitionsProp = "num.partitions"
+  val LogDirsProp = "log.dirs"
+  val LogDirProp = "log.dir"
+  val LogSegmentBytesProp = "log.segment.bytes"
+
+  val LogRollTimeMillisProp = "log.roll.ms"
+  val LogRollTimeHoursProp = "log.roll.hours"
+
+  val LogRollTimeJitterMillisProp = "log.roll.jitter.ms"
+  val LogRollTimeJitterHoursProp = "log.roll.jitter.hours"
+
+  val LogRetentionTimeMillisProp = "log.retention.ms"
+  val LogRetentionTimeMinutesProp = "log.retention.minutes"
+  val LogRetentionTimeHoursProp = "log.retention.hours"
+
+  val LogRetentionBytesProp = "log.retention.bytes"
+  val LogCleanupIntervalMsProp = "log.retention.check.interval.ms"
+  val LogCleanupPolicyProp = "log.cleanup.policy"
+  val LogCleanerThreadsProp = "log.cleaner.threads"
+  val LogCleanerIoMaxBytesPerSecondProp = "log.cleaner.io.max.bytes.per.second"
+  val LogCleanerDedupeBufferSizeProp = "log.cleaner.dedupe.buffer.size"
+  val LogCleanerIoBufferSizeProp = "log.cleaner.io.buffer.size"
+  val LogCleanerDedupeBufferLoadFactorProp = "log.cleaner.io.buffer.load.factor"
+  val LogCleanerBackoffMsProp = "log.cleaner.backoff.ms"
+  val LogCleanerMinCleanRatioProp = "log.cleaner.min.cleanable.ratio"
+  val LogCleanerEnableProp = "log.cleaner.enable"
+  val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
+  val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
+  val LogIndexIntervalBytesProp = "log.index.interval.bytes"
+  val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
+  val LogDeleteDelayMsProp = "log.segment.delete.delay.ms"
+  val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
+  val LogFlushIntervalMsProp = "log.flush.interval.ms"
+  val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
+  val LogPreAllocateProp = "log.preallocate"
+  val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
+  val MinInSyncReplicasProp = "min.insync.replicas"
+  /** ********* Replication configuration ***********/
+  val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
+  val DefaultReplicationFactorProp = "default.replication.factor"
+  val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms"
+  val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms"
+  val ReplicaSocketReceiveBufferBytesProp = "replica.socket.receive.buffer.bytes"
+  val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes"
+  val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms"
+  val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes"
+  val ReplicaFetchBackoffMsProp = "replica.fetch.backoff.ms"
+  val NumReplicaFetchersProp = "num.replica.fetchers"
+  val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms"
+  val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests"
+  val ProducerPurgatoryPurgeIntervalRequestsProp = "producer.purgatory.purge.interval.requests"
+  val AutoLeaderRebalanceEnableProp = "auto.leader.rebalance.enable"
+  val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage"
+  val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds"
+  val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
+  val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol"
+  val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
+  /** ********* Controlled shutdown configuration ***********/
+  val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
+  val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
+  val ControlledShutdownEnableProp = "controlled.shutdown.enable"
+  /** ********* Consumer coordinator configuration ***********/
+  val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms"
+  val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms"
+  /** ********* Offset management configuration ***********/
+  val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes"
+  val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size"
+  val OffsetsTopicReplicationFactorProp = "offsets.topic.replication.factor"
+  val OffsetsTopicPartitionsProp = "offsets.topic.num.partitions"
+  val OffsetsTopicSegmentBytesProp = "offsets.topic.segment.bytes"
+  val OffsetsTopicCompressionCodecProp = "offsets.topic.compression.codec"
+  val OffsetsRetentionMinutesProp = "offsets.retention.minutes"
+  val OffsetsRetentionCheckIntervalMsProp = "offsets.retention.check.interval.ms"
+  val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms"
+  val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks"
+  /** ********* Quota Configuration ***********/
+  val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
+  val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
+  val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides"
+  val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides"
+  val NumQuotaSamplesProp = "quota.window.num"
+  val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
+
+  val DeleteTopicEnableProp = "delete.topic.enable"
+  val CompressionTypeProp = "compression.type"
+
+  /** ********* Kafka Metrics Configuration ***********/
+  val MetricSampleWindowMsProp = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG
+  val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
+  val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
+
+  /** ********* SSL Configuration ****************/
+  val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
+  val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG
+  val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG
+  val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG
+  val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
+  val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG
+  val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG
+  val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
+  val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG
+  val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
+  val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
+  val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
+  val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
+  val SSLTrustManagerAlgorithmProp = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
+  val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
+  val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
+"""
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/directory.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/directory.py b/tests/kafkatest/services/kafka/directory.py
new file mode 100644
index 0000000..59af1fc
--- /dev/null
+++ b/tests/kafkatest/services/kafka/directory.py
@@ -0,0 +1,32 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# "trunk" installation of kafka
+KAFKA_TRUNK = "kafka-trunk"
+
+
+def kafka_dir(node=None):
+    """Return name of kafka directory for the given node.
+
+    This provides a convenient way to support different versions of kafka or kafka tools running
+    on different nodes.
+    """
+    if node is None:
+        return KAFKA_TRUNK
+
+    if not hasattr(node, "version"):
+        return KAFKA_TRUNK
+
+    return "kafka-" + str(node.version)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
new file mode 100644
index 0000000..5e4a1e1
--- /dev/null
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -0,0 +1,303 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+
+from config import KafkaConfig
+from kafkatest.services.kafka import config_property
+from kafkatest.services.kafka.version import TRUNK
+from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
+
+from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.utils.security_config import SecurityConfig
+import json
+import re
+import signal
+import subprocess
+import time
+
+
+class KafkaService(JmxMixin, Service):
+
+    logs = {
+        "kafka_log": {
+            "path": "/mnt/kafka.log",
+            "collect_default": True},
+        "kafka_operational_logs": {
+            "path": "/mnt/kafka-operational-logs",
+            "collect_default": True},
+        "kafka_data": {
+            "path": "/mnt/kafka-data-logs",
+            "collect_default": False}
+    }
+
+    def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
+                 topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
+        """
+        :type context
+        :type zk: ZookeeperService
+        :type topics: dict
+        """
+        Service.__init__(self, context, num_nodes)
+        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
+
+        self.zk = zk
+        self.quota_config = quota_config
+
+        self.security_protocol = security_protocol
+        self.interbroker_security_protocol = interbroker_security_protocol
+        self.topics = topics
+
+        for node in self.nodes:
+            node.version = version
+            node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
+
+    @property
+    def security_config(self):
+        if self.security_protocol == SecurityConfig.SSL or self.interbroker_security_protocol == SecurityConfig.SSL:
+            return SecurityConfig(SecurityConfig.SSL)
+        else:
+            return SecurityConfig(SecurityConfig.PLAINTEXT)
+
+    @property
+    def port(self):
+        return 9092 if self.security_protocol == SecurityConfig.PLAINTEXT else 9093
+
+    def start(self):
+        Service.start(self)
+
+        # Create topics if necessary
+        if self.topics is not None:
+            for topic, topic_cfg in self.topics.items():
+                if topic_cfg is None:
+                    topic_cfg = {}
+
+                topic_cfg["topic"] = topic
+                self.create_topic(topic_cfg)
+
+    def prop_file(self, node):
+        cfg = KafkaConfig(**node.config)
+        cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
+        cfg[config_property.ZOOKEEPER_CONNECT] = self.zk.connect_setting()
+
+        # TODO - clean up duplicate configuration logic
+        prop_file = cfg.render()
+        prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
+                                  security_config=self.security_config, port=self.port)
+        return prop_file
+
+    def start_cmd(self, node):
+        cmd = "export JMX_PORT=%d; " % self.jmx_port
+        cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; "
+        cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
+        return cmd
+
+    def start_node(self, node):
+        prop_file = self.prop_file(node)
+        self.logger.info("kafka.properties:")
+        self.logger.info(prop_file)
+        node.account.create_file("/mnt/kafka.properties", prop_file)
+
+        self.security_config.setup_node(node)
+
+        cmd = self.start_cmd(node)
+        self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
+        with node.account.monitor_log("/mnt/kafka.log") as monitor:
+            node.account.ssh(cmd)
+            monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
+
+        self.start_jmx_tool(self.idx(node), node)
+        if len(self.pids(node)) == 0:
+            raise Exception("No process ids recorded on node %s" % str(node))
+
+    def pids(self, node):
+        """Return process ids associated with running processes on the given node."""
+        try:
+            cmd = "ps ax | grep -i kafka | grep java | grep -v grep | awk '{print $1}'"
+
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            return pid_arr
+        except (subprocess.CalledProcessError, ValueError) as e:
+            return []
+
+    def signal_node(self, node, sig=signal.SIGTERM):
+        pids = self.pids(node)
+        for pid in pids:
+            node.account.signal(pid, sig)
+
+    def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
+        leader = self.leader(topic, partition)
+        self.signal_node(leader, sig)
+
+    def stop_node(self, node, clean_shutdown=True):
+        pids = self.pids(node)
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+        for pid in pids:
+            node.account.signal(pid, sig, allow_fail=False)
+        wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=20, err_msg="Kafka node failed to stop")
+
+    def clean_node(self, node):
+        JmxMixin.clean_node(self, node)
+        self.security_config.clean_node(node)
+        node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf /mnt/*", allow_fail=False)
+
+    def create_topic(self, topic_cfg, node=None):
+        """Run the admin tool create topic command.
+        Specifying node is optional, and may be done if for different kafka nodes have different versions,
+        and we care where command gets run.
+
+        If the node is not specified, run the command from self.nodes[0]
+        """
+        if node is None:
+            node = self.nodes[0]
+        self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
+
+        cmd = "/opt/%s/bin/kafka-topics.sh " % kafka_dir(node)
+        cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
+                'zk_connect': self.zk.connect_setting(),
+                'topic': topic_cfg.get("topic"),
+                'partitions': topic_cfg.get('partitions', 1),
+                'replication': topic_cfg.get('replication-factor', 1)
+            }
+
+        if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None:
+            for config_name, config_value in topic_cfg["configs"].items():
+                cmd += " --config %s=%s" % (config_name, str(config_value))
+
+        self.logger.info("Running topic creation command...\n%s" % cmd)
+        node.account.ssh(cmd)
+
+        time.sleep(1)
+        self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
+        for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
+            self.logger.info(line)
+
+    def describe_topic(self, topic, node=None):
+        if node is None:
+            node = self.nodes[0]
+        cmd = "/opt/%s/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
+              (kafka_dir(node), self.zk.connect_setting(), topic)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+        return output
+
+    def verify_reassign_partitions(self, reassignment, node=None):
+        """Run the reassign partitions admin tool in "verify" mode
+        """
+        if node is None:
+            node = self.nodes[0]
+
+        json_file = "/tmp/%s_reassign.json" % str(time.time())
+
+        # reassignment to json
+        json_str = json.dumps(reassignment)
+        json_str = json.dumps(json_str)
+
+        # create command
+        cmd = "echo %s > %s && " % (json_str, json_file)
+        cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
+        cmd += "--zookeeper %s " % self.zk.connect_setting()
+        cmd += "--reassignment-json-file %s " % json_file
+        cmd += "--verify "
+        cmd += "&& sleep 1 && rm -f %s" % json_file
+
+        # send command
+        self.logger.info("Verifying parition reassignment...")
+        self.logger.debug(cmd)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+
+        self.logger.debug(output)
+
+        if re.match(".*is in progress.*", output) is not None:
+            return False
+
+        return True
+
+    def execute_reassign_partitions(self, reassignment, node=None):
+        """Run the reassign partitions admin tool in "verify" mode
+        """
+        if node is None:
+            node = self.nodes[0]
+        json_file = "/tmp/%s_reassign.json" % str(time.time())
+
+        # reassignment to json
+        json_str = json.dumps(reassignment)
+        json_str = json.dumps(json_str)
+
+        # create command
+        cmd = "echo %s > %s && " % (json_str, json_file)
+        cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
+        cmd += "--zookeeper %s " % self.zk.connect_setting()
+        cmd += "--reassignment-json-file %s " % json_file
+        cmd += "--execute"
+        cmd += " && sleep 1 && rm -f %s" % json_file
+
+        # send command
+        self.logger.info("Executing parition reassignment...")
+        self.logger.debug(cmd)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+
+        self.logger.debug("Verify partition reassignment:")
+        self.logger.debug(output)
+
+    def restart_node(self, node, clean_shutdown=True):
+        """Restart the given node."""
+        self.stop_node(node, clean_shutdown)
+        self.start_node(node)
+
+    def leader(self, topic, partition=0):
+        """ Get the leader replica for the given topic and partition.
+        """
+        kafka_dir = KAFKA_TRUNK
+        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " %\
+              (kafka_dir, self.zk.connect_setting())
+        cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
+        self.logger.debug(cmd)
+
+        node = self.zk.nodes[0]
+        self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
+        partition_state = None
+        for line in node.account.ssh_capture(cmd):
+            # loop through all lines in the output, but only hold on to the first match
+            if partition_state is None:
+                match = re.match("^({.+})$", line)
+                if match is not None:
+                    partition_state = match.groups()[0]
+
+        if partition_state is None:
+            raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
+
+        partition_state = json.loads(partition_state)
+        self.logger.info(partition_state)
+
+        leader_idx = int(partition_state["leader"])
+        self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
+        return self.get_node(leader_idx)
+
+    def bootstrap_servers(self):
+        """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
+        using the port for the configured security protocol.
+
+        This is the format expected by many config files.
+        """
+        return ','.join([node.account.hostname + ":" + str(self.port) for node in self.nodes])

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
new file mode 100644
index 0000000..4db1120
--- /dev/null
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+
+advertised.host.name={{ node.account.hostname }}
+
+{% if security_protocol == interbroker_security_protocol %}
+listeners={{ security_protocol }}://:{{ port }}
+advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }}
+{% else %}
+listeners=PLAINTEXT://:9092,SSL://:9093
+advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093
+{% endif %}
+
+num.network.threads=3
+num.io.threads=8
+socket.send.buffer.bytes=102400
+socket.receive.buffer.bytes=65536
+socket.request.max.bytes=104857600
+
+num.partitions=1
+num.recovery.threads.per.data.dir=1
+log.retention.hours=168
+log.segment.bytes=1073741824
+log.retention.check.interval.ms=300000
+log.cleaner.enable=false
+
+{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default is not none %}
+quota.producer.default={{ quota_config.quota_producer_default }}
+{% endif %}
+
+{% if quota_config.quota_consumer_default is defined and quota_config.quota_consumer_default is not none %}
+quota.consumer.default={{ quota_config.quota_consumer_default }}
+{% endif %}
+
+{% if quota_config.quota_producer_bytes_per_second_overrides is defined and quota_config.quota_producer_bytes_per_second_overrides is not none %}
+quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_per_second_overrides }}
+{% endif %}
+
+{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and quota_config.quota_consumer_bytes_per_second_overrides is not none %}
+quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides }}
+{% endif %}
+
+security.inter.broker.protocol={{ interbroker_security_protocol }}
+
+ssl.keystore.location=/mnt/ssl/test.keystore.jks
+ssl.keystore.password=test-ks-passwd
+ssl.key.password=test-key-passwd
+ssl.keystore.type=JKS
+ssl.truststore.location=/mnt/ssl/test.truststore.jks
+ssl.truststore.password=test-ts-passwd
+ssl.truststore.type=JKS

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/version.py b/tests/kafkatest/services/kafka/version.py
new file mode 100644
index 0000000..95f3448
--- /dev/null
+++ b/tests/kafkatest/services/kafka/version.py
@@ -0,0 +1,61 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.utils import kafkatest_version
+
+from distutils.version import LooseVersion
+
+
+class KafkaVersion(LooseVersion):
+    """Container for kafka versions which makes versions simple to compare.
+
+    distutils.version.LooseVersion (and StrictVersion) has robust comparison and ordering logic.
+
+    Example:
+
+        v10 = KafkaVersion("0.10.0")
+        v9 = KafkaVersion("0.9.0.1")
+        assert v10 > v9  # assertion passes!
+    """
+    def __init__(self, version_string):
+        self.is_trunk = (version_string.lower() == "trunk")
+        if self.is_trunk:
+            # Since "trunk" may actually be a branch that is not trunk,
+            # use kafkatest_version() for comparison purposes,
+            # and track whether we're in "trunk" with a flag
+            version_string = kafkatest_version()
+
+            # Drop dev suffix if present
+            dev_suffix_index = version_string.find(".dev")
+            if dev_suffix_index >= 0:
+                version_string = version_string[:dev_suffix_index]
+
+        # Don't use the form super.(...).__init__(...) because
+        # LooseVersion is an "old style" python class
+        LooseVersion.__init__(self, version_string)
+
+    def __str__(self):
+        if self.is_trunk:
+            return "trunk"
+        else:
+            return LooseVersion.__str__(self)
+
+
+TRUNK = KafkaVersion("trunk")
+
+# 0.8.2.X versions
+V_0_8_2_1 = KafkaVersion("0.8.2.1")
+V_0_8_2_2 = KafkaVersion("0.8.2.2")
+LATEST_0_8_2 = V_0_8_2_2
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index ff6bb18..46ad82e 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -15,6 +15,8 @@
 
 from ducktape.services.background_thread import BackgroundThreadService
 
+from kafkatest.services.kafka.directory import kafka_dir
+
 
 class KafkaLog4jAppender(BackgroundThreadService):
 
@@ -32,14 +34,15 @@ class KafkaLog4jAppender(BackgroundThreadService):
         self.max_messages = max_messages
 
     def _worker(self, idx, node):
-        cmd = self.start_cmd
+        cmd = self.start_cmd(node)
         self.logger.debug("VerifiableKafkaLog4jAppender %d command: %s" % (idx, cmd))
         node.account.ssh(cmd)
 
-    @property
-    def start_cmd(self):
-        cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender" \
-              " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+    def start_cmd(self, node):
+        cmd = "/opt/%s/bin/" % kafka_dir(node)
+        cmd += "kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender"
+        cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index afbed13..a3b4928 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -17,6 +17,8 @@
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 
+from kafkatest.services.kafka.directory import kafka_dir
+
 import os
 import subprocess
 
@@ -63,7 +65,6 @@ class MirrorMaker(Service):
     LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
     CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties")
-    KAFKA_HOME = "/opt/kafka/"
 
     logs = {
         "mirror_maker_log": {
@@ -101,7 +102,7 @@ class MirrorMaker(Service):
     def start_cmd(self, node):
         cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
-        cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
+        cmd += " /opt/%s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % kafka_dir(node)
         cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
         cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
         if isinstance(self.num_streams, int):

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/monitor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/__init__.py b/tests/kafkatest/services/monitor/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/services/monitor/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/monitor/jmx.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
new file mode 100644
index 0000000..06c7dc8
--- /dev/null
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.kafka.directory import kafka_dir
+
+class JmxMixin(object):
+    """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
+
+    Note that this is not a service in its own right.
+    """
+    def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]):
+        self.jmx_object_names = jmx_object_names
+        self.jmx_attributes = jmx_attributes
+        self.jmx_port = 9192
+
+        self.started = [False] * num_nodes
+        self.jmx_stats = [{} for x in range(num_nodes)]
+        self.maximum_jmx_value = {}  # map from object_attribute_name to maximum value observed over time
+        self.average_jmx_value = {}  # map from object_attribute_name to average value observed over time
+
+    def clean_node(self, node):
+        node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
+
+    def start_jmx_tool(self, idx, node):
+        if self.started[idx-1] or self.jmx_object_names is None:
+            return
+
+        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.JmxTool " \
+              "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (kafka_dir(node), self.jmx_port)
+        for jmx_object_name in self.jmx_object_names:
+            cmd += " --object-name %s" % jmx_object_name
+        for jmx_attribute in self.jmx_attributes:
+            cmd += " --attributes %s" % jmx_attribute
+        cmd += " | tee -a /mnt/jmx_tool.log"
+
+        self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
+        jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
+        jmx_output.next()
+
+        self.started[idx-1] = True
+
+    def read_jmx_output(self, idx, node):
+        if self.started[idx-1] == False:
+            return
+
+        object_attribute_names = []
+
+        cmd = "cat /mnt/jmx_tool.log"
+        self.logger.debug("Read jmx output %d command: %s", idx, cmd)
+        for line in node.account.ssh_capture(cmd, allow_fail=False):
+            if "time" in line:
+                object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
+                continue
+            stats = [float(field) for field in line.split(',')]
+            time_sec = int(stats[0]/1000)
+            self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
+
+        # do not calculate average and maximum of jmx stats until we have read output from all nodes
+        if any(len(time_to_stats) == 0 for time_to_stats in self.jmx_stats):
+            return
+
+        start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
+        end_time_sec = max([max(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
+
+        for name in object_attribute_names:
+            aggregates_per_time = []
+            for time_sec in xrange(start_time_sec, end_time_sec + 1):
+                # assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth
+                values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats]
+                # assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth
+                aggregates_per_time.append(sum(values_per_node))
+            self.average_jmx_value[name] = sum(aggregates_per_time) / len(aggregates_per_time)
+            self.maximum_jmx_value[name] = max(aggregates_per_time)
+
+    def read_jmx_output_all_nodes(self):
+        for node in self.nodes:
+            self.read_jmx_output(self.idx(node), node)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index 053059b..e52220c 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from kafkatest.services.performance import PerformanceService
+from kafkatest.services.kafka.directory import kafka_dir
 from kafkatest.utils.security_config import SecurityConfig
 
 import os
@@ -120,11 +121,10 @@ class ConsumerPerformanceService(PerformanceService):
 
         return args
 
-    @property
-    def start_cmd(self):
+    def start_cmd(self, node):
         cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
-        cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh"
+        cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node)
         for key, value in self.args.items():
             cmd += " --%s %s" % (key, value)
         cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
@@ -144,7 +144,7 @@ class ConsumerPerformanceService(PerformanceService):
         node.account.create_file(ConsumerPerformanceService.CONFIG_FILE, str(self.security_config))
         self.security_config.setup_node(node)
 
-        cmd = self.start_cmd
+        cmd = self.start_cmd(node)
         self.logger.debug("Consumer performance %d command: %s", idx, cmd)
         last = None
         for line in node.account.ssh_capture(cmd):

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 0559a43..2be1621 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -16,6 +16,8 @@
 from kafkatest.services.performance import PerformanceService
 from kafkatest.utils.security_config import SecurityConfig
 
+from kafkatest.services.kafka.directory import kafka_dir
+
 
 class EndToEndLatencyService(PerformanceService):
 
@@ -51,10 +53,8 @@ class EndToEndLatencyService(PerformanceService):
             'ssl_config_file': ssl_config_file
         })
 
-        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
-              "%(bootstrap_servers)s %(topic)s %(num_records)d "\
-              "%(acks)d 20 %(ssl_config_file)s" % args
-
+        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % kafka_dir(node)
+        cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(ssl_config_file)s" % args
         cmd += " | tee /mnt/end-to-end-latency.log"
 
         self.logger.debug("End-to-end latency %d command: %s", idx, cmd)