You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/27 23:18:33 UTC

[2/2] kafka git commit: KAFKA-1888: rolling upgrade test

KAFKA-1888: rolling upgrade test

ewencp gwenshap
This needs some refactoring to avoid the duplicated code between replication test and upgrade test, but in shape for initial feedback.

I'm interested in feedback on the added `KafkaConfig` class and `kafka_props` file. This addition makes it:
- easier to attach different configs to different nodes (e.g. during broker upgrade process)
- easier to reason about the configuration of a particular node

Notes:
- in the default values in the KafkaConfig class, I removed many properties which were in kafka.properties before. This is because most of those properties were set to what is already the default value.
- when running non-trunk VerifiableProducer, I append the trunk tools jar to the classpath, and run it with the non-trunk kafka-run-class.sh script

Author: Geoff Anderson <ge...@confluent.io>

Reviewers: Dong Lin, Ewen Cheslack-Postava

Closes #229 from granders/KAFKA-1888-upgrade-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6b34330
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6b34330
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6b34330

Branch: refs/heads/trunk
Commit: e6b343302f3208f7f6e0099fe2a7132ef9eaaafb
Parents: af42c37
Author: Geoff Anderson <ge...@confluent.io>
Authored: Tue Oct 27 15:23:47 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 27 15:23:47 2015 -0700

----------------------------------------------------------------------
 tests/kafkatest/__init__.py                     |  10 +
 .../sanity_checks/test_console_consumer.py      |  43 ++-
 .../sanity_checks/test_kafka_version.py         |  55 ++++
 .../sanity_checks/test_verifiable_producer.py   |  70 +++++
 tests/kafkatest/services/console_consumer.py    | 109 ++++---
 tests/kafkatest/services/copycat.py             |   9 +-
 tests/kafkatest/services/kafka.py               | 253 ----------------
 tests/kafkatest/services/kafka/__init__.py      |  16 +
 tests/kafkatest/services/kafka/config.py        |  53 ++++
 .../kafkatest/services/kafka/config_property.py | 177 +++++++++++
 tests/kafkatest/services/kafka/directory.py     |  32 ++
 tests/kafkatest/services/kafka/kafka.py         | 303 +++++++++++++++++++
 .../services/kafka/templates/kafka.properties   |  65 ++++
 tests/kafkatest/services/kafka/version.py       |  61 ++++
 .../kafkatest/services/kafka_log4j_appender.py  |  13 +-
 tests/kafkatest/services/mirror_maker.py        |   5 +-
 tests/kafkatest/services/monitor/__init__.py    |  14 +
 tests/kafkatest/services/monitor/jmx.py         |  90 ++++++
 .../performance/consumer_performance.py         |   8 +-
 .../services/performance/end_to_end_latency.py  |   8 +-
 .../kafkatest/services/performance/jmx_mixin.py |  81 -----
 .../performance/producer_performance.py         |  44 +--
 .../services/templates/kafka.properties         |  74 -----
 tests/kafkatest/services/verifiable_producer.py |  88 +++++-
 tests/kafkatest/services/zookeeper.py           |   8 +-
 .../kafkatest/tests/produce_consume_validate.py | 106 +++++++
 tests/kafkatest/tests/quota_test.py             |  20 +-
 tests/kafkatest/tests/replication_test.py       | 207 +++++--------
 tests/kafkatest/tests/upgrade_test.py           |  81 +++++
 tests/kafkatest/utils/__init__.py               |   4 +-
 tests/kafkatest/utils/util.py                   |  42 +++
 tests/setup.py                                  |  10 +-
 .../apache/kafka/tools/VerifiableProducer.java  |  30 +-
 vagrant/base.sh                                 |  26 +-
 34 files changed, 1561 insertions(+), 654 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index 28d269b..e346811 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -14,3 +14,13 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
+# This determines the version of kafkatest that can be published to PyPi and installed with pip
+#
+# Note that in development, this version name can't follow Kafka's convention of having a trailing "-SNAPSHOT"
+# due to python version naming restrictions, which are enforced by python packaging tools
+# (see  https://www.python.org/dev/peps/pep-0440/)
+#
+# Instead, in trunk, the version should have a suffix of the form ".devN"
+#
+# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
+__version__ = '0.9.0.0.dev0'

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index a9c4d53..0d2c1fd 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -20,11 +20,16 @@ from ducktape.mark import matrix
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_8_2
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils.remote_account import line_count, file_exists
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.utils.security_config import SecurityConfig
+
 
 import time
 
+
 class ConsoleConsumerTest(Test):
     """Sanity checks on console consumer service class."""
     def __init__(self, test_context):
@@ -32,24 +37,29 @@ class ConsoleConsumerTest(Test):
 
         self.topic = "topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)
 
     def setUp(self):
         self.zk.start()
 
-    @parametrize(security_protocol='SSL', new_consumer=True)
-    @matrix(security_protocol=['PLAINTEXT'], new_consumer=[False, True])
+    @parametrize(security_protocol=SecurityConfig.SSL, new_consumer=True)
+    @matrix(security_protocol=[SecurityConfig.PLAINTEXT], new_consumer=[False, True])
     def test_lifecycle(self, security_protocol, new_consumer):
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
-                                  security_protocol=security_protocol,
-                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+        """Check that console consumer starts/stops properly, and that we are capturing log output."""
+
+        self.kafka.security_protocol = security_protocol
         self.kafka.start()
 
+        self.consumer.security_protocol = security_protocol
+        self.consumer.new_consumer = new_consumer
+
         t0 = time.time()
-        self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, security_protocol=security_protocol, new_consumer=new_consumer)
         self.consumer.start()
         node = self.consumer.nodes[0]
 
-        wait_until(lambda: self.consumer.alive(node), 
+        wait_until(lambda: self.consumer.alive(node),
             timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
         self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
 
@@ -62,3 +72,22 @@ class ConsoleConsumerTest(Test):
         assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
 
         self.consumer.stop_node(node)
+
+    def test_version(self):
+        """Check that console consumer v0.8.2.X successfully starts and consumes messages."""
+        self.kafka.start()
+
+        num_messages = 1000
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
+                                           max_messages=num_messages, throughput=1000)
+        self.producer.start()
+        self.producer.wait()
+
+        self.consumer.nodes[0].version = LATEST_0_8_2
+        self.consumer.consumer_timeout_ms = 1000
+        self.consumer.start()
+        self.consumer.wait()
+
+        num_consumed = len(self.consumer.messages_consumed[1])
+        num_produced = self.producer.num_acked
+        assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/sanity_checks/test_kafka_version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py
new file mode 100644
index 0000000..f5f5d5f
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_kafka_version.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService, config_property
+from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
+from kafkatest.utils import is_version
+
+
+class KafkaVersionTest(Test):
+    """Sanity checks on kafka versioning."""
+    def __init__(self, test_context):
+        super(KafkaVersionTest, self).__init__(test_context)
+
+        self.topic = "topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+
+    def setUp(self):
+        self.zk.start()
+
+    def test_0_8_2(self):
+        """Test kafka service node-versioning api - verify that we can bring up a single-node 0.8.2.X cluster."""
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+        node = self.kafka.nodes[0]
+        node.version = LATEST_0_8_2
+        self.kafka.start()
+
+        assert is_version(node, [LATEST_0_8_2])
+
+    def test_multi_version(self):
+        """Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X,
+        the other on trunk."""
+        self.kafka = KafkaService(self.test_context, num_nodes=2, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 2}})
+        self.kafka.nodes[1].version = LATEST_0_8_2
+        self.kafka.nodes[1].config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
+        self.kafka.start()
+
+        assert is_version(self.kafka.nodes[0], [TRUNK.vstring])
+        assert is_version(self.kafka.nodes[1], [LATEST_0_8_2])

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
new file mode 100644
index 0000000..4155279
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.utils import is_version
+
+
+class TestVerifiableProducer(Test):
+    """Sanity checks on verifiable producer service class."""
+    def __init__(self, test_context):
+        super(TestVerifiableProducer, self).__init__(test_context)
+
+        self.topic = "topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+
+        self.num_messages = 100
+        # This will produce to source kafka cluster
+        self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
+                                           max_messages=self.num_messages, throughput=1000)
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    @parametrize(producer_version=str(LATEST_0_8_2))
+    @parametrize(producer_version=str(TRUNK))
+    def test_simple_run(self, producer_version=TRUNK):
+        """
+        Test that we can start VerifiableProducer on trunk or against the 0.8.2 jar, and
+        verify that we can produce a small number of messages.
+        """
+        node = self.producer.nodes[0]
+        node.version = KafkaVersion(producer_version)
+        self.producer.start()
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
+             err_msg="Producer failed to start in a reasonable amount of time.")
+
+        # using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring
+        # that this check works with TRUNK
+        # When running VerifiableProducer 0.8.X, both trunk version and 0.8.X should show up because of the way
+        # verifiable producer pulls in some trunk directories into its classpath
+        assert is_version(node, [node.version.vstring, TRUNK.vstring])
+
+        self.producer.wait()
+        num_produced = self.producer.num_acked
+        assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 96fe777..07343e8 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -13,15 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.services.background_thread import BackgroundThreadService
 from ducktape.utils.util import wait_until
-from kafkatest.services.performance.jmx_mixin import JmxMixin
-from kafkatest.services.performance import PerformanceService
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
+from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.utils.security_config import SecurityConfig
 
+import itertools
 import os
 import subprocess
-import itertools
+
 
 def is_int(msg):
     """Default method used to check whether text pulled from console consumer is a message.
@@ -30,7 +33,7 @@ def is_int(msg):
     """
     try:
         return int(msg)
-    except:
+    except ValueError:
         return None
 
 """
@@ -74,7 +77,7 @@ Option                                  Description
 """
 
 
-class ConsoleConsumer(JmxMixin, PerformanceService):
+class ConsoleConsumer(JmxMixin, BackgroundThreadService):
     # Root directory for persistent output
     PERSISTENT_ROOT = "/mnt/console_consumer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
@@ -94,10 +97,10 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
         "consumer_log": {
             "path": LOG_FILE,
             "collect_default": True}
-        }
+    }
 
-    def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None,
-                 from_beginning=True, consumer_timeout_ms=None, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
+    def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, new_consumer=False, message_validator=None,
+                 from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
         """
         Args:
             context:                    standard context
@@ -114,7 +117,7 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
                                         in a topic.
         """
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
-        PerformanceService.__init__(self, context, num_nodes)
+        BackgroundThreadService.__init__(self, context, num_nodes)
         self.kafka = kafka
         self.new_consumer = new_consumer
         self.args = {
@@ -122,47 +125,70 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
         }
 
         self.consumer_timeout_ms = consumer_timeout_ms
+        for node in self.nodes:
+            node.version = version
 
         self.from_beginning = from_beginning
         self.message_validator = message_validator
         self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
         self.client_id = client_id
+        self.security_protocol = security_protocol
+
+        # Validate a few configs
+        if self.new_consumer is None:
+            self.new_consumer = self.security_protocol == SecurityConfig.SSL
+        if self.security_protocol == SecurityConfig.SSL and not self.new_consumer:
+            raise Exception("SSL protocol is supported only with the new consumer")
 
+    def prop_file(self, node):
+        """Return a string which can be used to create a configuration file appropriate for the given node."""
         # Process client configuration
-        self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms, client_id=self.client_id)
+        prop_file = self.render('console_consumer.properties')
+        if hasattr(node, "version") and node.version <= LATEST_0_8_2:
+            # in 0.8.2.X and earlier, console consumer does not have --timeout-ms option
+            # instead, we have to pass it through the config file
+            prop_file += "\nconsumer.timeout.ms=%s\n" % str(self.consumer_timeout_ms)
 
         # Add security properties to the config. If security protocol is not specified,
         # use the default in the template properties.
-        self.security_config = SecurityConfig(security_protocol, self.prop_file)
+        self.security_config = SecurityConfig(self.security_protocol, prop_file)
         self.security_protocol = self.security_config.security_protocol
-        if self.new_consumer is None:
-            self.new_consumer = self.security_protocol == SecurityConfig.SSL
-        if self.security_protocol == SecurityConfig.SSL and not self.new_consumer:
-            raise Exception("SSL protocol is supported only with the new consumer")
-        self.prop_file += str(self.security_config)
 
-    @property
-    def start_cmd(self):
+        prop_file += str(self.security_config)
+        return prop_file
+
+    def start_cmd(self, node):
+        """Return the start command appropriate for the given node."""
         args = self.args.copy()
         args['zk_connect'] = self.kafka.zk.connect_setting()
         args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
+        args['log_dir'] = ConsoleConsumer.LOG_DIR
+        args['log4j_config'] = ConsoleConsumer.LOG4J_CONFIG
         args['config_file'] = ConsoleConsumer.CONFIG_FILE
+        args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['jmx_port'] = self.jmx_port
+        args['kafka_dir'] = kafka_dir(node)
+        args['broker_list'] = self.kafka.bootstrap_servers()
 
-        cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
-        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
-        cmd += " JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \
-            " --consumer.config %(config_file)s" % args
+        cmd = "export JMX_PORT=%(jmx_port)s; " \
+              "export LOG_DIR=%(log_dir)s; " \
+              "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
+              "/opt/%(kafka_dir)s/bin/kafka-console-consumer.sh " \
+              "--topic %(topic)s --consumer.config %(config_file)s" % args
 
         if self.new_consumer:
-            cmd += " --new-consumer --bootstrap-server %s"  % self.kafka.bootstrap_servers()
+            cmd += " --new-consumer --bootstrap-server %(broker_list)s" % args
         else:
             cmd += " --zookeeper %(zk_connect)s" % args
         if self.from_beginning:
             cmd += " --from-beginning"
+
         if self.consumer_timeout_ms is not None:
-            cmd += " --timeout-ms %s" % self.consumer_timeout_ms
+            # version 0.8.X and below do not support --timeout-ms option
+            # This will be added in the properties file instead
+            if node.version > LATEST_0_8_2:
+                cmd += " --timeout-ms %s" % self.consumer_timeout_ms
 
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
@@ -183,8 +209,10 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
 
         # Create and upload config file
         self.logger.info("console_consumer.properties:")
-        self.logger.info(self.prop_file)
-        node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file)
+
+        prop_file = self.prop_file(node)
+        self.logger.info(prop_file)
+        node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
         self.security_config.setup_node(node)
 
         # Create and upload log properties
@@ -192,23 +220,26 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
         node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
 
         # Run and capture output
-        cmd = self.start_cmd
+        cmd = self.start_cmd(node)
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
 
         consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
-        first_line = consumer_output.next()
-        self.start_jmx_tool(idx, node)
-        for line in itertools.chain([first_line], consumer_output):
-            msg = line.strip()
-            if self.message_validator is not None:
-                msg = self.message_validator(msg)
-            if msg is not None:
-                self.messages_consumed[idx].append(msg)
+        first_line = next(consumer_output, None)
+
+        if first_line is not None:
+            self.start_jmx_tool(idx, node)
+
+            for line in itertools.chain([first_line], consumer_output):
+                msg = line.strip()
+                if self.message_validator is not None:
+                    msg = self.message_validator(msg)
+                if msg is not None:
+                    self.messages_consumed[idx].append(msg)
 
-        self.read_jmx_output(idx, node)
+            self.read_jmx_output(idx, node)
 
     def start_node(self, node):
-        PerformanceService.start_node(self, node)
+        BackgroundThreadService.start_node(self, node)
 
     def stop_node(self, node):
         node.account.kill_process("console_consumer", allow_fail=True)
@@ -220,6 +251,6 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
             self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
                              (self.__class__.__name__, node.account))
         JmxMixin.clean_node(self, node)
-        PerformanceService.clean_node(self, node)
+        node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
         self.security_config.clean_node(node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/copycat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py
index 45ef330..831a932 100644
--- a/tests/kafkatest/services/copycat.py
+++ b/tests/kafkatest/services/copycat.py
@@ -15,8 +15,9 @@
 
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
-import subprocess, signal
 
+from kafkatest.services.kafka.directory import kafka_dir
+import signal
 
 class CopycatServiceBase(Service):
     """Base class for Copycat services providing some common settings and functionality"""
@@ -99,7 +100,7 @@ class CopycatStandaloneService(CopycatServiceBase):
 
         self.logger.info("Starting Copycat standalone process")
         with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            node.account.ssh("/opt/kafka/bin/copycat-standalone.sh /mnt/copycat.properties " +
+            node.account.ssh("/opt/%s/bin/copycat-standalone.sh /mnt/copycat.properties " % kafka_dir(node) +
                              " ".join(remote_connector_configs) +
                              " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid")
             monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup")
@@ -108,7 +109,6 @@ class CopycatStandaloneService(CopycatServiceBase):
             raise RuntimeError("No process ids recorded")
 
 
-
 class CopycatDistributedService(CopycatServiceBase):
     """Runs Copycat in distributed mode."""
 
@@ -128,7 +128,7 @@ class CopycatDistributedService(CopycatServiceBase):
 
         self.logger.info("Starting Copycat distributed process")
         with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            cmd = "/opt/kafka/bin/copycat-distributed.sh /mnt/copycat.properties "
+            cmd = "/opt/%s/bin/copycat-distributed.sh /mnt/copycat.properties " % kafka_dir(node)
             # Only submit connectors on the first node so they don't get submitted multiple times. Also only submit them
             # the first time the node is started so
             if self.first_start and node == self.nodes[0]:
@@ -140,4 +140,3 @@ class CopycatDistributedService(CopycatServiceBase):
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
 
-        self.first_start = False

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py
deleted file mode 100644
index 5c4b22f..0000000
--- a/tests/kafkatest/services/kafka.py
+++ /dev/null
@@ -1,253 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.services.service import Service
-from ducktape.utils.util import wait_until
-from kafkatest.services.performance.jmx_mixin import JmxMixin
-from kafkatest.utils.security_config import SecurityConfig
-import json
-import re
-import signal
-import time
-
-
-class KafkaService(JmxMixin, Service):
-
-    logs = {
-        "kafka_log": {
-            "path": "/mnt/kafka.log",
-            "collect_default": True},
-        "kafka_data": {
-            "path": "/mnt/kafka-logs",
-            "collect_default": False}
-    }
-
-    def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
-                 topics=None, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
-        """
-        :type context
-        :type zk: ZookeeperService
-        :type topics: dict
-        """
-        Service.__init__(self, context, num_nodes)
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
-        self.zk = zk
-        if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL:
-            self.security_config = SecurityConfig(SecurityConfig.SSL)
-        else:
-            self.security_config = SecurityConfig(SecurityConfig.PLAINTEXT)
-        self.security_protocol = security_protocol
-        self.interbroker_security_protocol = interbroker_security_protocol
-        self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093
-        self.topics = topics
-        self.quota_config = quota_config
-
-    def start(self):
-        Service.start(self)
-
-        # Create topics if necessary
-        if self.topics is not None:
-            for topic, topic_cfg in self.topics.items():
-                if topic_cfg is None:
-                    topic_cfg = {}
-
-                topic_cfg["topic"] = topic
-                self.create_topic(topic_cfg)
-
-    def start_node(self, node):
-        props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node),
-            port = self.port, security_protocol = self.security_protocol, quota_config=self.quota_config,
-            interbroker_security_protocol=self.interbroker_security_protocol)
-        self.logger.info("kafka.properties:")
-        self.logger.info(props_file)
-        node.account.create_file("/mnt/kafka.properties", props_file)
-        self.security_config.setup_node(node)
-
-        cmd = "JMX_PORT=%d /opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" % self.jmx_port
-        self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
-        with node.account.monitor_log("/mnt/kafka.log") as monitor:
-            node.account.ssh(cmd)
-            monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
-        self.start_jmx_tool(self.idx(node), node)
-        if len(self.pids(node)) == 0:
-            raise Exception("No process ids recorded on node %s" % str(node))
-
-    def pids(self, node):
-        """Return process ids associated with running processes on the given node."""
-        try:
-            return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)]
-        except:
-            return []
-
-    def signal_node(self, node, sig=signal.SIGTERM):
-        pids = self.pids(node)
-        for pid in pids:
-            node.account.signal(pid, sig)
-
-    def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
-        leader = self.leader(topic, partition)
-        self.signal_node(leader, sig)
-
-    def stop_node(self, node, clean_shutdown=True):
-        pids = self.pids(node)
-        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
-
-        for pid in pids:
-            node.account.signal(pid, sig, allow_fail=False)
-
-        node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
-
-    def clean_node(self, node):
-        JmxMixin.clean_node(self, node)
-        node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
-        self.security_config.clean_node(node)
-
-    def create_topic(self, topic_cfg):
-        node = self.nodes[0] # any node is fine here
-        self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
-
-        cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\
-            "--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
-                'zk_connect': self.zk.connect_setting(),
-                'topic': topic_cfg.get("topic"),
-                'partitions': topic_cfg.get('partitions', 1),
-                'replication': topic_cfg.get('replication-factor', 1)
-            }
-
-        if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None:
-            for config_name, config_value in topic_cfg["configs"].items():
-                cmd += " --config %s=%s" % (config_name, str(config_value))
-
-        self.logger.info("Running topic creation command...\n%s" % cmd)
-        node.account.ssh(cmd)
-
-        time.sleep(1)
-        self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
-        for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
-            self.logger.info(line)
-
-    def describe_topic(self, topic):
-        node = self.nodes[0]
-        cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
-              (self.zk.connect_setting(), topic)
-        output = ""
-        for line in node.account.ssh_capture(cmd):
-            output += line
-        return output
-
-    def verify_reassign_partitions(self, reassignment):
-        """Run the reassign partitions admin tool in "verify" mode
-        """
-        node = self.nodes[0]
-        json_file = "/tmp/" + str(time.time()) + "_reassign.json"
-
-        # reassignment to json
-        json_str = json.dumps(reassignment)
-        json_str = json.dumps(json_str)
-
-        # create command
-        cmd = "echo %s > %s && " % (json_str, json_file)
-        cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
-                "--zookeeper %(zk_connect)s "\
-                "--reassignment-json-file %(reassignment_file)s "\
-                "--verify" % {'zk_connect': self.zk.connect_setting(),
-                                'reassignment_file': json_file}
-        cmd += " && sleep 1 && rm -f %s" % json_file
-
-        # send command
-        self.logger.info("Verifying parition reassignment...")
-        self.logger.debug(cmd)
-        output = ""
-        for line in node.account.ssh_capture(cmd):
-            output += line
-
-        self.logger.debug(output)
-
-        if re.match(".*is in progress.*", output) is not None:
-            return False
-
-        return True
-
-    def execute_reassign_partitions(self, reassignment):
-        """Run the reassign partitions admin tool in "verify" mode
-        """
-        node = self.nodes[0]
-        json_file = "/tmp/" + str(time.time()) + "_reassign.json"
-
-        # reassignment to json
-        json_str = json.dumps(reassignment)
-        json_str = json.dumps(json_str)
-
-        # create command
-        cmd = "echo %s > %s && " % (json_str, json_file)
-        cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
-                "--zookeeper %(zk_connect)s "\
-                "--reassignment-json-file %(reassignment_file)s "\
-                "--execute" % {'zk_connect': self.zk.connect_setting(),
-                                'reassignment_file': json_file}
-        cmd += " && sleep 1 && rm -f %s" % json_file
-
-        # send command
-        self.logger.info("Executing parition reassignment...")
-        self.logger.debug(cmd)
-        output = ""
-        for line in node.account.ssh_capture(cmd):
-            output += line
-
-        self.logger.debug("Verify partition reassignment:")
-        self.logger.debug(output)
-
-    def restart_node(self, node, wait_sec=0, clean_shutdown=True):
-        """Restart the given node, waiting wait_sec in between stopping and starting up again."""
-        self.stop_node(node, clean_shutdown)
-        time.sleep(wait_sec)
-        self.start_node(node)
-
-    def leader(self, topic, partition=0):
-        """ Get the leader replica for the given topic and partition.
-        """
-        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \
-              % self.zk.connect_setting()
-        cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
-        self.logger.debug(cmd)
-
-        node = self.nodes[0]
-        self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
-        partition_state = None
-        for line in node.account.ssh_capture(cmd):
-            match = re.match("^({.+})$", line)
-            if match is not None:
-                partition_state = match.groups()[0]
-                break
-
-        if partition_state is None:
-            raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
-
-        partition_state = json.loads(partition_state)
-        self.logger.info(partition_state)
-
-        leader_idx = int(partition_state["leader"])
-        self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
-        return self.get_node(leader_idx)
-
-    def bootstrap_servers(self):
-        """Get the broker list to connect to Kafka using the specified security protocol
-        """
-        return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes])
-
-    def read_jmx_output_all_nodes(self):
-        for node in self.nodes:
-            self.read_jmx_output(self.idx(node), node)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/__init__.py b/tests/kafkatest/services/kafka/__init__.py
new file mode 100644
index 0000000..6408b59
--- /dev/null
+++ b/tests/kafkatest/services/kafka/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafka import KafkaService

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py
new file mode 100644
index 0000000..0accf20
--- /dev/null
+++ b/tests/kafkatest/services/kafka/config.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import config_property
+
+
+class KafkaConfig(dict):
+    """A dictionary-like container class which allows for definition of overridable default values,
+    which is also capable of "rendering" itself as a useable server.properties file.
+    """
+
+    DEFAULTS = {
+        config_property.PORT: 9092,
+        config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
+        config_property.LOG_DIRS: "/mnt/kafka-logs",
+        config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 2000
+    }
+
+    def __init__(self, **kwargs):
+        super(KafkaConfig, self).__init__(**kwargs)
+
+        # Set defaults
+        for key, val in self.DEFAULTS.items():
+            if not self.has_key(key):
+                self[key] = val
+
+    def render(self):
+        """Render self as a series of lines key=val\n, and do so in a consistent order. """
+        keys = [k for k in self.keys()]
+        keys.sort()
+
+        s = ""
+        for k in keys:
+            s += "%s=%s\n" % (k, str(self[k]))
+        return s
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
new file mode 100644
index 0000000..cc685aa
--- /dev/null
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Define Kafka configuration property names here.
+"""
+
+BROKER_ID = "broker.id"
+PORT = "port"
+ADVERTISED_HOSTNAME = "advertised.host.name"
+
+NUM_NETWORK_THREADS = "num.network.threads"
+NUM_IO_THREADS = "num.io.threads"
+SOCKET_SEND_BUFFER_BYTES = "socket.send.buffer.bytes"
+SOCKET_RECEIVE_BUFFER_BYTES = "socket.receive.buffer.bytes"
+SOCKET_REQUEST_MAX_BYTES = "socket.request.max.bytes"
+LOG_DIRS = "log.dirs"
+NUM_PARTITIONS = "num.partitions"
+NUM_RECOVERY_THREADS_PER_DATA_DIR = "num.recovery.threads.per.data.dir"
+
+LOG_RETENTION_HOURS = "log.retention.hours"
+LOG_SEGMENT_BYTES = "log.segment.bytes"
+LOG_RETENTION_CHECK_INTERVAL_MS = "log.retention.check.interval.ms"
+LOG_CLEANER_ENABLE = "log.cleaner.enable"
+
+AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"
+
+ZOOKEEPER_CONNECT = "zookeeper.connect"
+ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms"
+INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version"
+
+
+"""
+From KafkaConfig.scala
+
+  /** ********* General Configuration ***********/
+  val MaxReservedBrokerIdProp = "reserved.broker.max.id"
+  val MessageMaxBytesProp = "message.max.bytes"
+  val NumIoThreadsProp = "num.io.threads"
+  val BackgroundThreadsProp = "background.threads"
+  val QueuedMaxRequestsProp = "queued.max.requests"
+  /** ********* Socket Server Configuration ***********/
+  val PortProp = "port"
+  val HostNameProp = "host.name"
+  val ListenersProp = "listeners"
+  val AdvertisedPortProp = "advertised.port"
+  val AdvertisedListenersProp = "advertised.listeners"
+  val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
+  val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
+  val SocketRequestMaxBytesProp = "socket.request.max.bytes"
+  val MaxConnectionsPerIpProp = "max.connections.per.ip"
+  val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
+  val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
+  /** ********* Log Configuration ***********/
+  val NumPartitionsProp = "num.partitions"
+  val LogDirsProp = "log.dirs"
+  val LogDirProp = "log.dir"
+  val LogSegmentBytesProp = "log.segment.bytes"
+
+  val LogRollTimeMillisProp = "log.roll.ms"
+  val LogRollTimeHoursProp = "log.roll.hours"
+
+  val LogRollTimeJitterMillisProp = "log.roll.jitter.ms"
+  val LogRollTimeJitterHoursProp = "log.roll.jitter.hours"
+
+  val LogRetentionTimeMillisProp = "log.retention.ms"
+  val LogRetentionTimeMinutesProp = "log.retention.minutes"
+  val LogRetentionTimeHoursProp = "log.retention.hours"
+
+  val LogRetentionBytesProp = "log.retention.bytes"
+  val LogCleanupIntervalMsProp = "log.retention.check.interval.ms"
+  val LogCleanupPolicyProp = "log.cleanup.policy"
+  val LogCleanerThreadsProp = "log.cleaner.threads"
+  val LogCleanerIoMaxBytesPerSecondProp = "log.cleaner.io.max.bytes.per.second"
+  val LogCleanerDedupeBufferSizeProp = "log.cleaner.dedupe.buffer.size"
+  val LogCleanerIoBufferSizeProp = "log.cleaner.io.buffer.size"
+  val LogCleanerDedupeBufferLoadFactorProp = "log.cleaner.io.buffer.load.factor"
+  val LogCleanerBackoffMsProp = "log.cleaner.backoff.ms"
+  val LogCleanerMinCleanRatioProp = "log.cleaner.min.cleanable.ratio"
+  val LogCleanerEnableProp = "log.cleaner.enable"
+  val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
+  val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
+  val LogIndexIntervalBytesProp = "log.index.interval.bytes"
+  val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
+  val LogDeleteDelayMsProp = "log.segment.delete.delay.ms"
+  val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
+  val LogFlushIntervalMsProp = "log.flush.interval.ms"
+  val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
+  val LogPreAllocateProp = "log.preallocate"
+  val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
+  val MinInSyncReplicasProp = "min.insync.replicas"
+  /** ********* Replication configuration ***********/
+  val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
+  val DefaultReplicationFactorProp = "default.replication.factor"
+  val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms"
+  val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms"
+  val ReplicaSocketReceiveBufferBytesProp = "replica.socket.receive.buffer.bytes"
+  val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes"
+  val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms"
+  val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes"
+  val ReplicaFetchBackoffMsProp = "replica.fetch.backoff.ms"
+  val NumReplicaFetchersProp = "num.replica.fetchers"
+  val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms"
+  val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests"
+  val ProducerPurgatoryPurgeIntervalRequestsProp = "producer.purgatory.purge.interval.requests"
+  val AutoLeaderRebalanceEnableProp = "auto.leader.rebalance.enable"
+  val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage"
+  val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds"
+  val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
+  val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol"
+  val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
+  /** ********* Controlled shutdown configuration ***********/
+  val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
+  val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
+  val ControlledShutdownEnableProp = "controlled.shutdown.enable"
+  /** ********* Consumer coordinator configuration ***********/
+  val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms"
+  val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms"
+  /** ********* Offset management configuration ***********/
+  val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes"
+  val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size"
+  val OffsetsTopicReplicationFactorProp = "offsets.topic.replication.factor"
+  val OffsetsTopicPartitionsProp = "offsets.topic.num.partitions"
+  val OffsetsTopicSegmentBytesProp = "offsets.topic.segment.bytes"
+  val OffsetsTopicCompressionCodecProp = "offsets.topic.compression.codec"
+  val OffsetsRetentionMinutesProp = "offsets.retention.minutes"
+  val OffsetsRetentionCheckIntervalMsProp = "offsets.retention.check.interval.ms"
+  val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms"
+  val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks"
+  /** ********* Quota Configuration ***********/
+  val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
+  val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
+  val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides"
+  val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides"
+  val NumQuotaSamplesProp = "quota.window.num"
+  val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
+
+  val DeleteTopicEnableProp = "delete.topic.enable"
+  val CompressionTypeProp = "compression.type"
+
+  /** ********* Kafka Metrics Configuration ***********/
+  val MetricSampleWindowMsProp = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG
+  val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
+  val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
+
+  /** ********* SSL Configuration ****************/
+  val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
+  val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG
+  val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG
+  val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG
+  val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
+  val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG
+  val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG
+  val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
+  val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG
+  val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
+  val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
+  val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
+  val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
+  val SSLTrustManagerAlgorithmProp = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
+  val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
+  val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
+"""
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/directory.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/directory.py b/tests/kafkatest/services/kafka/directory.py
new file mode 100644
index 0000000..59af1fc
--- /dev/null
+++ b/tests/kafkatest/services/kafka/directory.py
@@ -0,0 +1,32 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# "trunk" installation of kafka
+KAFKA_TRUNK = "kafka-trunk"
+
+
+def kafka_dir(node=None):
+    """Return name of kafka directory for the given node.
+
+    This provides a convenient way to support different versions of kafka or kafka tools running
+    on different nodes.
+    """
+    if node is None:
+        return KAFKA_TRUNK
+
+    if not hasattr(node, "version"):
+        return KAFKA_TRUNK
+
+    return "kafka-" + str(node.version)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
new file mode 100644
index 0000000..5e4a1e1
--- /dev/null
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -0,0 +1,303 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+
+from config import KafkaConfig
+from kafkatest.services.kafka import config_property
+from kafkatest.services.kafka.version import TRUNK
+from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
+
+from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.utils.security_config import SecurityConfig
+import json
+import re
+import signal
+import subprocess
+import time
+
+
+class KafkaService(JmxMixin, Service):
+
+    logs = {
+        "kafka_log": {
+            "path": "/mnt/kafka.log",
+            "collect_default": True},
+        "kafka_operational_logs": {
+            "path": "/mnt/kafka-operational-logs",
+            "collect_default": True},
+        "kafka_data": {
+            "path": "/mnt/kafka-data-logs",
+            "collect_default": False}
+    }
+
+    def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
+                 topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
+        """
+        :type context
+        :type zk: ZookeeperService
+        :type topics: dict
+        """
+        Service.__init__(self, context, num_nodes)
+        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
+
+        self.zk = zk
+        self.quota_config = quota_config
+
+        self.security_protocol = security_protocol
+        self.interbroker_security_protocol = interbroker_security_protocol
+        self.topics = topics
+
+        for node in self.nodes:
+            node.version = version
+            node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
+
+    @property
+    def security_config(self):
+        if self.security_protocol == SecurityConfig.SSL or self.interbroker_security_protocol == SecurityConfig.SSL:
+            return SecurityConfig(SecurityConfig.SSL)
+        else:
+            return SecurityConfig(SecurityConfig.PLAINTEXT)
+
+    @property
+    def port(self):
+        return 9092 if self.security_protocol == SecurityConfig.PLAINTEXT else 9093
+
+    def start(self):
+        Service.start(self)
+
+        # Create topics if necessary
+        if self.topics is not None:
+            for topic, topic_cfg in self.topics.items():
+                if topic_cfg is None:
+                    topic_cfg = {}
+
+                topic_cfg["topic"] = topic
+                self.create_topic(topic_cfg)
+
+    def prop_file(self, node):
+        cfg = KafkaConfig(**node.config)
+        cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
+        cfg[config_property.ZOOKEEPER_CONNECT] = self.zk.connect_setting()
+
+        # TODO - clean up duplicate configuration logic
+        prop_file = cfg.render()
+        prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
+                                  security_config=self.security_config, port=self.port)
+        return prop_file
+
+    def start_cmd(self, node):
+        cmd = "export JMX_PORT=%d; " % self.jmx_port
+        cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; "
+        cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
+        return cmd
+
+    def start_node(self, node):
+        prop_file = self.prop_file(node)
+        self.logger.info("kafka.properties:")
+        self.logger.info(prop_file)
+        node.account.create_file("/mnt/kafka.properties", prop_file)
+
+        self.security_config.setup_node(node)
+
+        cmd = self.start_cmd(node)
+        self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
+        with node.account.monitor_log("/mnt/kafka.log") as monitor:
+            node.account.ssh(cmd)
+            monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
+
+        self.start_jmx_tool(self.idx(node), node)
+        if len(self.pids(node)) == 0:
+            raise Exception("No process ids recorded on node %s" % str(node))
+
+    def pids(self, node):
+        """Return process ids associated with running processes on the given node."""
+        try:
+            cmd = "ps ax | grep -i kafka | grep java | grep -v grep | awk '{print $1}'"
+
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            return pid_arr
+        except (subprocess.CalledProcessError, ValueError) as e:
+            return []
+
+    def signal_node(self, node, sig=signal.SIGTERM):
+        pids = self.pids(node)
+        for pid in pids:
+            node.account.signal(pid, sig)
+
+    def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
+        leader = self.leader(topic, partition)
+        self.signal_node(leader, sig)
+
+    def stop_node(self, node, clean_shutdown=True):
+        pids = self.pids(node)
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+        for pid in pids:
+            node.account.signal(pid, sig, allow_fail=False)
+        wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=20, err_msg="Kafka node failed to stop")
+
+    def clean_node(self, node):
+        JmxMixin.clean_node(self, node)
+        self.security_config.clean_node(node)
+        node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf /mnt/*", allow_fail=False)
+
+    def create_topic(self, topic_cfg, node=None):
+        """Run the admin tool create topic command.
+        Specifying node is optional, and may be done if for different kafka nodes have different versions,
+        and we care where command gets run.
+
+        If the node is not specified, run the command from self.nodes[0]
+        """
+        if node is None:
+            node = self.nodes[0]
+        self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
+
+        cmd = "/opt/%s/bin/kafka-topics.sh " % kafka_dir(node)
+        cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
+                'zk_connect': self.zk.connect_setting(),
+                'topic': topic_cfg.get("topic"),
+                'partitions': topic_cfg.get('partitions', 1),
+                'replication': topic_cfg.get('replication-factor', 1)
+            }
+
+        if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None:
+            for config_name, config_value in topic_cfg["configs"].items():
+                cmd += " --config %s=%s" % (config_name, str(config_value))
+
+        self.logger.info("Running topic creation command...\n%s" % cmd)
+        node.account.ssh(cmd)
+
+        time.sleep(1)
+        self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
+        for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
+            self.logger.info(line)
+
+    def describe_topic(self, topic, node=None):
+        if node is None:
+            node = self.nodes[0]
+        cmd = "/opt/%s/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
+              (kafka_dir(node), self.zk.connect_setting(), topic)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+        return output
+
+    def verify_reassign_partitions(self, reassignment, node=None):
+        """Run the reassign partitions admin tool in "verify" mode
+        """
+        if node is None:
+            node = self.nodes[0]
+
+        json_file = "/tmp/%s_reassign.json" % str(time.time())
+
+        # reassignment to json
+        json_str = json.dumps(reassignment)
+        json_str = json.dumps(json_str)
+
+        # create command
+        cmd = "echo %s > %s && " % (json_str, json_file)
+        cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
+        cmd += "--zookeeper %s " % self.zk.connect_setting()
+        cmd += "--reassignment-json-file %s " % json_file
+        cmd += "--verify "
+        cmd += "&& sleep 1 && rm -f %s" % json_file
+
+        # send command
+        self.logger.info("Verifying parition reassignment...")
+        self.logger.debug(cmd)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+
+        self.logger.debug(output)
+
+        if re.match(".*is in progress.*", output) is not None:
+            return False
+
+        return True
+
+    def execute_reassign_partitions(self, reassignment, node=None):
+        """Run the reassign partitions admin tool in "verify" mode
+        """
+        if node is None:
+            node = self.nodes[0]
+        json_file = "/tmp/%s_reassign.json" % str(time.time())
+
+        # reassignment to json
+        json_str = json.dumps(reassignment)
+        json_str = json.dumps(json_str)
+
+        # create command
+        cmd = "echo %s > %s && " % (json_str, json_file)
+        cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
+        cmd += "--zookeeper %s " % self.zk.connect_setting()
+        cmd += "--reassignment-json-file %s " % json_file
+        cmd += "--execute"
+        cmd += " && sleep 1 && rm -f %s" % json_file
+
+        # send command
+        self.logger.info("Executing parition reassignment...")
+        self.logger.debug(cmd)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+
+        self.logger.debug("Verify partition reassignment:")
+        self.logger.debug(output)
+
+    def restart_node(self, node, clean_shutdown=True):
+        """Restart the given node."""
+        self.stop_node(node, clean_shutdown)
+        self.start_node(node)
+
+    def leader(self, topic, partition=0):
+        """ Get the leader replica for the given topic and partition.
+        """
+        kafka_dir = KAFKA_TRUNK
+        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " %\
+              (kafka_dir, self.zk.connect_setting())
+        cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
+        self.logger.debug(cmd)
+
+        node = self.zk.nodes[0]
+        self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
+        partition_state = None
+        for line in node.account.ssh_capture(cmd):
+            # loop through all lines in the output, but only hold on to the first match
+            if partition_state is None:
+                match = re.match("^({.+})$", line)
+                if match is not None:
+                    partition_state = match.groups()[0]
+
+        if partition_state is None:
+            raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
+
+        partition_state = json.loads(partition_state)
+        self.logger.info(partition_state)
+
+        leader_idx = int(partition_state["leader"])
+        self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
+        return self.get_node(leader_idx)
+
+    def bootstrap_servers(self):
+        """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
+        using the port for the configured security protocol.
+
+        This is the format expected by many config files.
+        """
+        return ','.join([node.account.hostname + ":" + str(self.port) for node in self.nodes])

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
new file mode 100644
index 0000000..4db1120
--- /dev/null
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+
+advertised.host.name={{ node.account.hostname }}
+
+{% if security_protocol == interbroker_security_protocol %}
+listeners={{ security_protocol }}://:{{ port }}
+advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }}
+{% else %}
+listeners=PLAINTEXT://:9092,SSL://:9093
+advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093
+{% endif %}
+
+num.network.threads=3
+num.io.threads=8
+socket.send.buffer.bytes=102400
+socket.receive.buffer.bytes=65536
+socket.request.max.bytes=104857600
+
+num.partitions=1
+num.recovery.threads.per.data.dir=1
+log.retention.hours=168
+log.segment.bytes=1073741824
+log.retention.check.interval.ms=300000
+log.cleaner.enable=false
+
+{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default is not none %}
+quota.producer.default={{ quota_config.quota_producer_default }}
+{% endif %}
+
+{% if quota_config.quota_consumer_default is defined and quota_config.quota_consumer_default is not none %}
+quota.consumer.default={{ quota_config.quota_consumer_default }}
+{% endif %}
+
+{% if quota_config.quota_producer_bytes_per_second_overrides is defined and quota_config.quota_producer_bytes_per_second_overrides is not none %}
+quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_per_second_overrides }}
+{% endif %}
+
+{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and quota_config.quota_consumer_bytes_per_second_overrides is not none %}
+quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides }}
+{% endif %}
+
+security.inter.broker.protocol={{ interbroker_security_protocol }}
+
+ssl.keystore.location=/mnt/ssl/test.keystore.jks
+ssl.keystore.password=test-ks-passwd
+ssl.key.password=test-key-passwd
+ssl.keystore.type=JKS
+ssl.truststore.location=/mnt/ssl/test.truststore.jks
+ssl.truststore.password=test-ts-passwd
+ssl.truststore.type=JKS

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/version.py b/tests/kafkatest/services/kafka/version.py
new file mode 100644
index 0000000..95f3448
--- /dev/null
+++ b/tests/kafkatest/services/kafka/version.py
@@ -0,0 +1,61 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.utils import kafkatest_version
+
+from distutils.version import LooseVersion
+
+
+class KafkaVersion(LooseVersion):
+    """Container for kafka versions which makes versions simple to compare.
+
+    distutils.version.LooseVersion (and StrictVersion) has robust comparison and ordering logic.
+
+    Example:
+
+        v10 = KafkaVersion("0.10.0")
+        v9 = KafkaVersion("0.9.0.1")
+        assert v10 > v9  # assertion passes!
+    """
+    def __init__(self, version_string):
+        self.is_trunk = (version_string.lower() == "trunk")
+        if self.is_trunk:
+            # Since "trunk" may actually be a branch that is not trunk,
+            # use kafkatest_version() for comparison purposes,
+            # and track whether we're in "trunk" with a flag
+            version_string = kafkatest_version()
+
+            # Drop dev suffix if present
+            dev_suffix_index = version_string.find(".dev")
+            if dev_suffix_index >= 0:
+                version_string = version_string[:dev_suffix_index]
+
+        # Don't use the form super.(...).__init__(...) because
+        # LooseVersion is an "old style" python class
+        LooseVersion.__init__(self, version_string)
+
+    def __str__(self):
+        if self.is_trunk:
+            return "trunk"
+        else:
+            return LooseVersion.__str__(self)
+
+
+TRUNK = KafkaVersion("trunk")
+
+# 0.8.2.X versions
+V_0_8_2_1 = KafkaVersion("0.8.2.1")
+V_0_8_2_2 = KafkaVersion("0.8.2.2")
+LATEST_0_8_2 = V_0_8_2_2
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index ff6bb18..46ad82e 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -15,6 +15,8 @@
 
 from ducktape.services.background_thread import BackgroundThreadService
 
+from kafkatest.services.kafka.directory import kafka_dir
+
 
 class KafkaLog4jAppender(BackgroundThreadService):
 
@@ -32,14 +34,15 @@ class KafkaLog4jAppender(BackgroundThreadService):
         self.max_messages = max_messages
 
     def _worker(self, idx, node):
-        cmd = self.start_cmd
+        cmd = self.start_cmd(node)
         self.logger.debug("VerifiableKafkaLog4jAppender %d command: %s" % (idx, cmd))
         node.account.ssh(cmd)
 
-    @property
-    def start_cmd(self):
-        cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender" \
-              " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+    def start_cmd(self, node):
+        cmd = "/opt/%s/bin/" % kafka_dir(node)
+        cmd += "kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender"
+        cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index afbed13..a3b4928 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -17,6 +17,8 @@
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 
+from kafkatest.services.kafka.directory import kafka_dir
+
 import os
 import subprocess
 
@@ -63,7 +65,6 @@ class MirrorMaker(Service):
     LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
     CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties")
-    KAFKA_HOME = "/opt/kafka/"
 
     logs = {
         "mirror_maker_log": {
@@ -101,7 +102,7 @@ class MirrorMaker(Service):
     def start_cmd(self, node):
         cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
-        cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
+        cmd += " /opt/%s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % kafka_dir(node)
         cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
         cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
         if isinstance(self.num_streams, int):

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/monitor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/__init__.py b/tests/kafkatest/services/monitor/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/services/monitor/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/monitor/jmx.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
new file mode 100644
index 0000000..06c7dc8
--- /dev/null
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.kafka.directory import kafka_dir
+
+class JmxMixin(object):
+    """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
+
+    Note that this is not a service in its own right.
+    """
+    def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]):
+        self.jmx_object_names = jmx_object_names
+        self.jmx_attributes = jmx_attributes
+        self.jmx_port = 9192
+
+        self.started = [False] * num_nodes
+        self.jmx_stats = [{} for x in range(num_nodes)]
+        self.maximum_jmx_value = {}  # map from object_attribute_name to maximum value observed over time
+        self.average_jmx_value = {}  # map from object_attribute_name to average value observed over time
+
+    def clean_node(self, node):
+        node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
+
+    def start_jmx_tool(self, idx, node):
+        if self.started[idx-1] or self.jmx_object_names is None:
+            return
+
+        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.JmxTool " \
+              "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (kafka_dir(node), self.jmx_port)
+        for jmx_object_name in self.jmx_object_names:
+            cmd += " --object-name %s" % jmx_object_name
+        for jmx_attribute in self.jmx_attributes:
+            cmd += " --attributes %s" % jmx_attribute
+        cmd += " | tee -a /mnt/jmx_tool.log"
+
+        self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
+        jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
+        jmx_output.next()
+
+        self.started[idx-1] = True
+
+    def read_jmx_output(self, idx, node):
+        if self.started[idx-1] == False:
+            return
+
+        object_attribute_names = []
+
+        cmd = "cat /mnt/jmx_tool.log"
+        self.logger.debug("Read jmx output %d command: %s", idx, cmd)
+        for line in node.account.ssh_capture(cmd, allow_fail=False):
+            if "time" in line:
+                object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
+                continue
+            stats = [float(field) for field in line.split(',')]
+            time_sec = int(stats[0]/1000)
+            self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
+
+        # do not calculate average and maximum of jmx stats until we have read output from all nodes
+        if any(len(time_to_stats) == 0 for time_to_stats in self.jmx_stats):
+            return
+
+        start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
+        end_time_sec = max([max(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
+
+        for name in object_attribute_names:
+            aggregates_per_time = []
+            for time_sec in xrange(start_time_sec, end_time_sec + 1):
+                # assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth
+                values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats]
+                # assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth
+                aggregates_per_time.append(sum(values_per_node))
+            self.average_jmx_value[name] = sum(aggregates_per_time) / len(aggregates_per_time)
+            self.maximum_jmx_value[name] = max(aggregates_per_time)
+
+    def read_jmx_output_all_nodes(self):
+        for node in self.nodes:
+            self.read_jmx_output(self.idx(node), node)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index 053059b..e52220c 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from kafkatest.services.performance import PerformanceService
+from kafkatest.services.kafka.directory import kafka_dir
 from kafkatest.utils.security_config import SecurityConfig
 
 import os
@@ -120,11 +121,10 @@ class ConsumerPerformanceService(PerformanceService):
 
         return args
 
-    @property
-    def start_cmd(self):
+    def start_cmd(self, node):
         cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
-        cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh"
+        cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node)
         for key, value in self.args.items():
             cmd += " --%s %s" % (key, value)
         cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
@@ -144,7 +144,7 @@ class ConsumerPerformanceService(PerformanceService):
         node.account.create_file(ConsumerPerformanceService.CONFIG_FILE, str(self.security_config))
         self.security_config.setup_node(node)
 
-        cmd = self.start_cmd
+        cmd = self.start_cmd(node)
         self.logger.debug("Consumer performance %d command: %s", idx, cmd)
         last = None
         for line in node.account.ssh_capture(cmd):

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 0559a43..2be1621 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -16,6 +16,8 @@
 from kafkatest.services.performance import PerformanceService
 from kafkatest.utils.security_config import SecurityConfig
 
+from kafkatest.services.kafka.directory import kafka_dir
+
 
 class EndToEndLatencyService(PerformanceService):
 
@@ -51,10 +53,8 @@ class EndToEndLatencyService(PerformanceService):
             'ssl_config_file': ssl_config_file
         })
 
-        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
-              "%(bootstrap_servers)s %(topic)s %(num_records)d "\
-              "%(acks)d 20 %(ssl_config_file)s" % args
-
+        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % kafka_dir(node)
+        cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(ssl_config_file)s" % args
         cmd += " | tee /mnt/end-to-end-latency.log"
 
         self.logger.debug("End-to-end latency %d command: %s", idx, cmd)