You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:42 UTC

[26/50] [abbrv] kafka git commit: KAFKA-3483: Restructure ducktape tests to simplify running subsets of tests

KAFKA-3483: Restructure ducktape tests to simplify running subsets of tests

… tests

Author: Grant Henke <gr...@gmail.com>

Reviewers: Geoff Anderson <ge...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1162 from granthenke/ducktape-structure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45c585b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45c585b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45c585b4

Branch: refs/heads/0.10.0
Commit: 45c585b4f7e3d5e5dd5297b4d121badbd2052922
Parents: 83cf385
Author: Grant Henke <gr...@gmail.com>
Authored: Sun Apr 3 20:04:36 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sun Apr 3 20:04:36 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/benchmarks/__init__.py          |  14 +
 tests/kafkatest/benchmarks/core/__init__.py     |  15 +
 .../kafkatest/benchmarks/core/benchmark_test.py | 253 ++++++++++++++++
 tests/kafkatest/tests/benchmark_test.py         | 253 ----------------
 tests/kafkatest/tests/client/__init__.py        |  15 +
 .../kafkatest/tests/client/compression_test.py  |  85 ++++++
 .../client/consumer_rolling_upgrade_test.py     |  82 ++++++
 tests/kafkatest/tests/client/consumer_test.py   | 291 +++++++++++++++++++
 .../tests/client/message_format_change_test.py  |  92 ++++++
 tests/kafkatest/tests/client/quota_test.py      | 170 +++++++++++
 .../tests/compatibility_test_new_broker_test.py |  78 -----
 tests/kafkatest/tests/compression_test.py       |  85 ------
 tests/kafkatest/tests/connect/__init__.py       |  15 +
 .../tests/connect/connect_distributed_test.py   | 210 +++++++++++++
 .../tests/connect/connect_rest_test.py          | 165 +++++++++++
 tests/kafkatest/tests/connect/connect_test.py   |  93 ++++++
 .../templates/connect-distributed.properties    |  46 +++
 .../templates/connect-file-sink.properties      |  20 ++
 .../templates/connect-file-source.properties    |  20 ++
 .../templates/connect-standalone.properties     |  32 ++
 .../kafkatest/tests/connect_distributed_test.py | 210 -------------
 tests/kafkatest/tests/connect_rest_test.py      | 165 -----------
 tests/kafkatest/tests/connect_test.py           |  93 ------
 .../tests/consumer_group_command_test.py        | 106 -------
 .../tests/consumer_rolling_upgrade_test.py      |  82 ------
 tests/kafkatest/tests/consumer_test.py          | 291 -------------------
 tests/kafkatest/tests/core/__init__.py          |  15 +
 .../core/compatibility_test_new_broker_test.py  |  78 +++++
 .../tests/core/consumer_group_command_test.py   | 106 +++++++
 .../tests/core/get_offset_shell_test.py         |  91 ++++++
 tests/kafkatest/tests/core/mirror_maker_test.py | 179 ++++++++++++
 .../tests/core/reassign_partitions_test.py      | 110 +++++++
 tests/kafkatest/tests/core/replication_test.py  | 152 ++++++++++
 .../tests/core/security_rolling_upgrade_test.py | 128 ++++++++
 .../tests/core/simple_consumer_shell_test.py    |  75 +++++
 tests/kafkatest/tests/core/upgrade_test.py      | 105 +++++++
 .../core/zookeeper_security_upgrade_test.py     | 117 ++++++++
 tests/kafkatest/tests/get_offset_shell_test.py  |  91 ------
 tests/kafkatest/tests/log4j_appender_test.py    |  93 ------
 .../tests/message_format_change_test.py         |  92 ------
 tests/kafkatest/tests/mirror_maker_test.py      | 179 ------------
 tests/kafkatest/tests/quota_test.py             | 170 -----------
 .../kafkatest/tests/reassign_partitions_test.py | 110 -------
 tests/kafkatest/tests/replication_test.py       | 152 ----------
 .../tests/security_rolling_upgrade_test.py      | 128 --------
 .../tests/simple_consumer_shell_test.py         |  75 -----
 tests/kafkatest/tests/streams/__init__.py       |  15 +
 .../tests/streams/streams_bounce_test.py        |  73 +++++
 .../tests/streams/streams_smoke_test.py         |  75 +++++
 tests/kafkatest/tests/streams_bounce_test.py    |  73 -----
 tests/kafkatest/tests/streams_smoke_test.py     |  75 -----
 .../templates/connect-distributed.properties    |  46 ---
 .../templates/connect-file-sink.properties      |  20 --
 .../templates/connect-file-source.properties    |  20 --
 .../templates/connect-standalone.properties     |  32 --
 tests/kafkatest/tests/tools/__init__.py         |  15 +
 .../tests/tools/log4j_appender_test.py          |  93 ++++++
 tests/kafkatest/tests/upgrade_test.py           | 105 -------
 .../tests/zookeeper_security_upgrade_test.py    | 117 --------
 59 files changed, 3045 insertions(+), 2941 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/benchmarks/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/__init__.py b/tests/kafkatest/benchmarks/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/benchmarks/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/benchmarks/core/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/__init__.py b/tests/kafkatest/benchmarks/core/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/benchmarks/core/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/benchmarks/core/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py
new file mode 100644
index 0000000..9c2e32d
--- /dev/null
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -0,0 +1,253 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
+
+
+TOPIC_REP_ONE = "topic-replication-factor-one"
+TOPIC_REP_THREE = "topic-replication-factor-three"
+DEFAULT_RECORD_SIZE = 100  # bytes
+
+
+class Benchmark(Test):
+    """A benchmark of Kafka producer/consumer performance. This replicates the test
+    run here:
+    https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
+    """
+    def __init__(self, test_context):
+        super(Benchmark, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 3
+        self.topics = {
+            TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
+            TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+
+        self.msgs_large = 10000000
+        self.batch_size = 8*1024
+        self.buffer_memory = 64*1024*1024
+        self.msg_sizes = [10, 100, 1000, 10000, 100000]
+        self.target_data_size = 128*1024*1024
+        self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+        self.kafka.log_level = "INFO"  # We don't DEBUG logging here
+        self.kafka.start()
+
+    @parametrize(acks=1, topic=TOPIC_REP_ONE)
+    @parametrize(acks=1, topic=TOPIC_REP_THREE)
+    @parametrize(acks=-1, topic=TOPIC_REP_THREE)
+    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
+    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
+    def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'):
+        """
+        Setup: 1 node zk + 3 node kafka cluster
+        Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
+        security protocol and message size are varied depending on arguments injected into this test.
+
+        Collect and return aggregate throughput statistics after all messages have been acknowledged.
+        (This runs ProducerPerformance.java under the hood)
+        """
+        self.start_kafka(security_protocol, security_protocol)
+        # Always generate the same total amount of data
+        nrecords = int(self.target_data_size / message_size)
+
+        self.producer = ProducerPerformanceService(
+            self.test_context, num_producers, self.kafka, topic=topic,
+            num_records=nrecords, record_size=message_size,  throughput=-1,
+            settings={
+                'acks': acks,
+                'batch.size': self.batch_size,
+                'buffer.memory': self.buffer_memory})
+        self.producer.run()
+        return compute_aggregate_throughput(self.producer)
+
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None):
+        """
+        Setup: 1 node zk + 3 node kafka cluster
+        Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
+
+        Collect and return aggregate throughput statistics after all messages have been acknowledged.
+
+        (This runs ProducerPerformance.java under the hood)
+        """
+        if interbroker_security_protocol is None:
+            interbroker_security_protocol = security_protocol
+        self.start_kafka(security_protocol, interbroker_security_protocol)
+        self.producer = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
+            throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
+            intermediate_stats=True
+        )
+        self.producer.run()
+
+        summary = ["Throughput over long run, data > memory:"]
+        data = {}
+        # FIXME we should be generating a graph too
+        # Try to break it into 5 blocks, but fall back to a smaller number if
+        # there aren't even 5 elements
+        block_size = max(len(self.producer.stats[0]) / 5, 1)
+        nblocks = len(self.producer.stats[0]) / block_size
+
+        for i in range(nblocks):
+            subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
+            if len(subset) == 0:
+                summary.append(" Time block %d: (empty)" % i)
+                data[i] = None
+            else:
+                records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset))
+                mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset))
+
+                summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec))
+                data[i] = throughput(records_per_sec, mb_per_sec)
+
+        self.logger.info("\n".join(summary))
+        return data
+
+
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+    def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None):
+        """
+        Setup: 1 node zk + 3 node kafka cluster
+        Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
+        measuring the latency between production and consumption of each message.
+
+        Return aggregate latency statistics.
+
+        (Under the hood, this simply runs EndToEndLatency.scala)
+        """
+        if interbroker_security_protocol is None:
+            interbroker_security_protocol = security_protocol
+        self.start_kafka(security_protocol, interbroker_security_protocol)
+        self.logger.info("BENCHMARK: End to end latency")
+        self.perf = EndToEndLatencyService(
+            self.test_context, 1, self.kafka,
+            topic=TOPIC_REP_THREE, num_records=10000
+        )
+        self.perf.run()
+        return latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
+
+    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True):
+        """
+        Setup: 1 node zk + 3 node kafka cluster
+        Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
+        using new consumer if new_consumer == True
+
+        Return aggregate throughput statistics for both producer and consumer.
+
+        (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
+        """
+        if interbroker_security_protocol is None:
+            interbroker_security_protocol = security_protocol
+        self.start_kafka(security_protocol, interbroker_security_protocol)
+        num_records = 10 * 1000 * 1000  # 10e6
+
+        self.producer = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic=TOPIC_REP_THREE,
+            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+            settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
+        )
+        self.consumer = ConsumerPerformanceService(
+            self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
+        Service.run_parallel(self.producer, self.consumer)
+
+        data = {
+            "producer": compute_aggregate_throughput(self.producer),
+            "consumer": compute_aggregate_throughput(self.consumer)
+        }
+        summary = [
+            "Producer + consumer:",
+            str(data)]
+        self.logger.info("\n".join(summary))
+        return data
+
+    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1):
+        """
+        Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
+        (using new consumer iff new_consumer == True), and report throughput.
+        """
+        if interbroker_security_protocol is None:
+            interbroker_security_protocol = security_protocol
+        self.start_kafka(security_protocol, interbroker_security_protocol)
+        num_records = 10 * 1000 * 1000  # 10e6
+
+        # seed kafka w/messages
+        self.producer = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic=TOPIC_REP_THREE,
+            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+            settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
+        )
+        self.producer.run()
+
+        # consume
+        self.consumer = ConsumerPerformanceService(
+            self.test_context, num_consumers, self.kafka,
+            topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
+        self.consumer.group = "test-consumer-group"
+        self.consumer.run()
+        return compute_aggregate_throughput(self.consumer)
+
+
+def throughput(records_per_sec, mb_per_sec):
+    """Helper method to ensure uniform representation of throughput data"""
+    return {
+        "records_per_sec": records_per_sec,
+        "mb_per_sec": mb_per_sec
+    }
+
+
+def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
+    """Helper method to ensure uniform representation of latency data"""
+    return {
+        "latency_50th_ms": latency_50th_ms,
+        "latency_99th_ms": latency_99th_ms,
+        "latency_999th_ms": latency_999th_ms
+    }
+
+
+def compute_aggregate_throughput(perf):
+    """Helper method for computing throughput after running a performance service."""
+    aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
+    aggregate_mbps = sum([r['mbps'] for r in perf.results])
+
+    return throughput(aggregate_rate, aggregate_mbps)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py
deleted file mode 100644
index 9c2e32d..0000000
--- a/tests/kafkatest/tests/benchmark_test.py
+++ /dev/null
@@ -1,253 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.services.service import Service
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
-
-
-TOPIC_REP_ONE = "topic-replication-factor-one"
-TOPIC_REP_THREE = "topic-replication-factor-three"
-DEFAULT_RECORD_SIZE = 100  # bytes
-
-
-class Benchmark(Test):
-    """A benchmark of Kafka producer/consumer performance. This replicates the test
-    run here:
-    https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
-    """
-    def __init__(self, test_context):
-        super(Benchmark, self).__init__(test_context)
-        self.num_zk = 1
-        self.num_brokers = 3
-        self.topics = {
-            TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
-            TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
-        }
-
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
-        self.msgs_large = 10000000
-        self.batch_size = 8*1024
-        self.buffer_memory = 64*1024*1024
-        self.msg_sizes = [10, 100, 1000, 10000, 100000]
-        self.target_data_size = 128*1024*1024
-        self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
-
-    def setUp(self):
-        self.zk.start()
-
-    def start_kafka(self, security_protocol, interbroker_security_protocol):
-        self.kafka = KafkaService(
-            self.test_context, self.num_brokers,
-            self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
-        self.kafka.log_level = "INFO"  # We don't DEBUG logging here
-        self.kafka.start()
-
-    @parametrize(acks=1, topic=TOPIC_REP_ONE)
-    @parametrize(acks=1, topic=TOPIC_REP_THREE)
-    @parametrize(acks=-1, topic=TOPIC_REP_THREE)
-    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
-    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
-    def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'):
-        """
-        Setup: 1 node zk + 3 node kafka cluster
-        Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
-        security protocol and message size are varied depending on arguments injected into this test.
-
-        Collect and return aggregate throughput statistics after all messages have been acknowledged.
-        (This runs ProducerPerformance.java under the hood)
-        """
-        self.start_kafka(security_protocol, security_protocol)
-        # Always generate the same total amount of data
-        nrecords = int(self.target_data_size / message_size)
-
-        self.producer = ProducerPerformanceService(
-            self.test_context, num_producers, self.kafka, topic=topic,
-            num_records=nrecords, record_size=message_size,  throughput=-1,
-            settings={
-                'acks': acks,
-                'batch.size': self.batch_size,
-                'buffer.memory': self.buffer_memory})
-        self.producer.run()
-        return compute_aggregate_throughput(self.producer)
-
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None):
-        """
-        Setup: 1 node zk + 3 node kafka cluster
-        Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
-
-        Collect and return aggregate throughput statistics after all messages have been acknowledged.
-
-        (This runs ProducerPerformance.java under the hood)
-        """
-        if interbroker_security_protocol is None:
-            interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol)
-        self.producer = ProducerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
-            throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
-            intermediate_stats=True
-        )
-        self.producer.run()
-
-        summary = ["Throughput over long run, data > memory:"]
-        data = {}
-        # FIXME we should be generating a graph too
-        # Try to break it into 5 blocks, but fall back to a smaller number if
-        # there aren't even 5 elements
-        block_size = max(len(self.producer.stats[0]) / 5, 1)
-        nblocks = len(self.producer.stats[0]) / block_size
-
-        for i in range(nblocks):
-            subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
-            if len(subset) == 0:
-                summary.append(" Time block %d: (empty)" % i)
-                data[i] = None
-            else:
-                records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset))
-                mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset))
-
-                summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec))
-                data[i] = throughput(records_per_sec, mb_per_sec)
-
-        self.logger.info("\n".join(summary))
-        return data
-
-
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
-    def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None):
-        """
-        Setup: 1 node zk + 3 node kafka cluster
-        Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
-        measuring the latency between production and consumption of each message.
-
-        Return aggregate latency statistics.
-
-        (Under the hood, this simply runs EndToEndLatency.scala)
-        """
-        if interbroker_security_protocol is None:
-            interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol)
-        self.logger.info("BENCHMARK: End to end latency")
-        self.perf = EndToEndLatencyService(
-            self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE, num_records=10000
-        )
-        self.perf.run()
-        return latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
-
-    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True):
-        """
-        Setup: 1 node zk + 3 node kafka cluster
-        Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
-        using new consumer if new_consumer == True
-
-        Return aggregate throughput statistics for both producer and consumer.
-
-        (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
-        """
-        if interbroker_security_protocol is None:
-            interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol)
-        num_records = 10 * 1000 * 1000  # 10e6
-
-        self.producer = ProducerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE,
-            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
-            settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
-        )
-        self.consumer = ConsumerPerformanceService(
-            self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
-        Service.run_parallel(self.producer, self.consumer)
-
-        data = {
-            "producer": compute_aggregate_throughput(self.producer),
-            "consumer": compute_aggregate_throughput(self.consumer)
-        }
-        summary = [
-            "Producer + consumer:",
-            str(data)]
-        self.logger.info("\n".join(summary))
-        return data
-
-    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1):
-        """
-        Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
-        (using new consumer iff new_consumer == True), and report throughput.
-        """
-        if interbroker_security_protocol is None:
-            interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol)
-        num_records = 10 * 1000 * 1000  # 10e6
-
-        # seed kafka w/messages
-        self.producer = ProducerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic=TOPIC_REP_THREE,
-            num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
-            settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
-        )
-        self.producer.run()
-
-        # consume
-        self.consumer = ConsumerPerformanceService(
-            self.test_context, num_consumers, self.kafka,
-            topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
-        self.consumer.group = "test-consumer-group"
-        self.consumer.run()
-        return compute_aggregate_throughput(self.consumer)
-
-
-def throughput(records_per_sec, mb_per_sec):
-    """Helper method to ensure uniform representation of throughput data"""
-    return {
-        "records_per_sec": records_per_sec,
-        "mb_per_sec": mb_per_sec
-    }
-
-
-def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
-    """Helper method to ensure uniform representation of latency data"""
-    return {
-        "latency_50th_ms": latency_50th_ms,
-        "latency_99th_ms": latency_99th_ms,
-        "latency_999th_ms": latency_999th_ms
-    }
-
-
-def compute_aggregate_throughput(perf):
-    """Helper method for computing throughput after running a performance service."""
-    aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
-    aggregate_mbps = sum([r['mbps'] for r in perf.results])
-
-    return throughput(aggregate_rate, aggregate_mbps)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/__init__.py b/tests/kafkatest/tests/client/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/tests/client/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
new file mode 100644
index 0000000..0de53ae
--- /dev/null
+++ b/tests/kafkatest/tests/client/compression_test.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int_with_prefix
+
+class CompressionTest(ProduceConsumeValidateTest):
+    """
+    These tests validate produce / consume for compressed topics.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(CompressionTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 10,
+                                                                    "replication-factor": 1}})
+        self.num_partitions = 10
+        self.timeout_sec = 60
+        self.producer_throughput = 1000
+        self.num_producers = 4
+        self.messages_per_producer = 1000
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
+    def test_compressed_topic(self, compression_types, new_consumer):
+        """Test produce => consume => validate for compressed topics
+        Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
+
+        compression_types parameter gives a list of compression types (or no compression if
+        "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
+        compression type from the list based on producer's index in the group.
+
+            - Produce messages in the background
+            - Consume messages in the background
+            - Stop producing, and finish consuming
+            - Validate that every acked message was consumed
+        """
+
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int_with_prefix,
+                                           compression_types=compression_types)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
+                                        new_consumer=new_consumer, consumer_timeout_ms=60000,
+                                        message_validator=is_int_with_prefix)
+        self.kafka.start()
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
new file mode 100644
index 0000000..3cd3c7c
--- /dev/null
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 4
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+    def __init__(self, test_context):
+        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
+                                                         num_zk=1, num_brokers=1, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+        })
+
+    def _verify_range_assignment(self, consumer):
+        # range assignment should give us two partition sets: (0, 1) and (2, 3)
+        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+    def _verify_roundrobin_assignment(self, consumer):
+        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+    def rolling_update_test(self):
+        """
+        Verify rolling updates of partition assignment strategies works correctly. In this
+        test, we use a rolling restart to change the group's assignment strategy from "range" 
+        to "roundrobin." We verify after every restart that all members are still in the group
+        and that the correct assignment strategy was used.
+        """
+
+        # initialize the consumer using range assignment
+        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
+
+        consumer.start()
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+
+        # change consumer configuration to prefer round-robin assignment, but still support range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+        # restart one of the nodes and verify that we are still using range assignment
+        consumer.stop_node(consumer.nodes[0])
+        consumer.start_node(consumer.nodes[0])
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+        
+        # now restart the other node and verify that we have switched to round-robin
+        consumer.stop_node(consumer.nodes[1])
+        consumer.start_node(consumer.nodes[1])
+        self.await_all_members(consumer)
+        self._verify_roundrobin_assignment(consumer)
+
+        # if we want, we can now drop support for range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN
+        for node in consumer.nodes:
+            consumer.stop_node(node)
+            consumer.start_node(node)
+            self.await_all_members(consumer)
+            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
new file mode 100644
index 0000000..084b19d
--- /dev/null
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -0,0 +1,291 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+import signal
+
+class OffsetValidationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 1
+
+    def __init__(self, test_context):
+        super(OffsetValidationTest, self).__init__(test_context, num_consumers=3, num_producers=1,
+                                                     num_zk=1, num_brokers=2, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
+        })
+
+    def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in consumer.nodes:
+                consumer.stop_node(node, clean_shutdown)
+
+                wait_until(lambda: len(consumer.dead_nodes()) == 1,
+                           timeout_sec=self.session_timeout_sec+5,
+                           err_msg="Timed out waiting for the consumer to shutdown")
+
+                consumer.start_node(node)
+
+                self.await_all_members(consumer)
+                self.await_consumed_messages(consumer)
+
+    def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in consumer.nodes:
+                consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+                       err_msg="Timed out waiting for the consumers to shutdown")
+            
+            for node in consumer.nodes:
+                consumer.start_node(node)
+
+            self.await_all_members(consumer)
+            self.await_consumed_messages(consumer)
+
+    def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in self.kafka.nodes:
+                self.kafka.restart_node(node, clean_shutdown=True)
+                self.await_all_members(consumer)
+                self.await_consumed_messages(consumer)
+
+    def test_broker_rolling_bounce(self):
+        """
+        Verify correct consumer behavior when the brokers are consecutively restarted.
+
+        Setup: single Kafka cluster with one producer writing messages to a single topic with one
+        partition, an a set of consumers in the same group reading from the same topic.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers and wait until they've joined the group.
+        - In a loop, restart each broker consecutively, waiting for the group to stabilize between
+          each broker restart.
+        - Verify delivery semantics according to the failure type and that the broker bounces
+          did not cause unexpected group rebalances.
+        """
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        producer = self.setup_producer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC)
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        num_rebalances = consumer.num_rebalances()
+        # TODO: make this test work with hard shutdowns, which probably requires
+        #       pausing before the node is restarted to ensure that any ephemeral
+        #       nodes have time to expire
+        self.rolling_bounce_brokers(consumer, clean_shutdown=True)
+        
+        unexpected_rebalances = consumer.num_rebalances() - num_rebalances
+        assert unexpected_rebalances == 0, \
+            "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
+
+        consumer.stop_all()
+
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+    @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
+    def test_consumer_bounce(self, clean_shutdown, bounce_mode):
+        """
+        Verify correct consumer behavior when the consumers in the group are consecutively restarted.
+
+        Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers and wait until they've joined the group.
+        - In a loop, restart each consumer, waiting for each one to rejoin the group before
+          restarting the rest.
+        - Verify delivery semantics according to the failure type.
+        """
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        producer = self.setup_producer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC)
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        if bounce_mode == "all":
+            self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
+        else:
+            self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
+                
+        consumer.stop_all()
+        if clean_shutdown:
+            # if the total records consumed matches the current position, we haven't seen any duplicates
+            # this can only be guaranteed with a clean shutdown
+            assert consumer.current_position(partition) == consumer.total_consumed(), \
+                "Total consumed records did not match consumed position"
+        else:
+            # we may have duplicates in a hard failure
+            assert consumer.current_position(partition) <= consumer.total_consumed(), \
+                "Current position greater than the total number of consumed records"
+
+    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+    def test_consumer_failure(self, clean_shutdown, enable_autocommit):
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+        producer = self.setup_producer(self.TOPIC)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        partition_owner = consumer.owner(partition)
+        assert partition_owner is not None
+
+        # startup the producer and ensure that some records have been written
+        producer.start()
+        self.await_produced_messages(producer)
+
+        # stop the partition owner and await its shutdown
+        consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
+        wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
+                   timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for consumer to close")
+
+        # ensure that the remaining consumer does some work after rebalancing
+        self.await_consumed_messages(consumer, min_messages=1000)
+
+        consumer.stop_all()
+
+        if clean_shutdown:
+            # if the total records consumed matches the current position, we haven't seen any duplicates
+            # this can only be guaranteed with a clean shutdown
+            assert consumer.current_position(partition) == consumer.total_consumed(), \
+                "Total consumed records did not match consumed position"
+        else:
+            # we may have duplicates in a hard failure
+            assert consumer.current_position(partition) <= consumer.total_consumed(), \
+                "Current position greater than the total number of consumed records"
+
+        # if autocommit is not turned on, we can also verify the last committed offset
+        if not enable_autocommit:
+            assert consumer.last_commit(partition) == consumer.current_position(partition), \
+                "Last committed offset did not match last consumed position"
+
+
+    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+    def test_broker_failure(self, clean_shutdown, enable_autocommit):
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+        producer = self.setup_producer(self.TOPIC)
+
+        producer.start()
+        consumer.start()
+        self.await_all_members(consumer)
+
+        num_rebalances = consumer.num_rebalances()
+
+        # shutdown one of the brokers
+        # TODO: we need a way to target the coordinator instead of picking arbitrarily
+        self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
+
+        # ensure that the consumers do some work after the broker failure
+        self.await_consumed_messages(consumer, min_messages=1000)
+
+        # verify that there were no rebalances on failover
+        assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
+
+        consumer.stop_all()
+
+        # if the total records consumed matches the current position, we haven't seen any duplicates
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+        # if autocommit is not turned on, we can also verify the last committed offset
+        if not enable_autocommit:
+            assert consumer.last_commit(partition) == consumer.current_position(partition), \
+                "Last committed offset did not match last consumed position"
+
+    def test_group_consumption(self):
+        """
+        Verifies correct group rebalance behavior as consumers are started and stopped. 
+        In particular, this test verifies that the partition is readable after every
+        expected rebalance.
+
+        Setup: single Kafka cluster with a group of consumers reading from one topic
+        with one partition while the verifiable producer writes to it.
+
+        - Start the consumers one by one, verifying consumption after each rebalance
+        - Shutdown the consumers one by one, verifying consumption after each rebalance
+        """
+        consumer = self.setup_consumer(self.TOPIC)
+        producer = self.setup_producer(self.TOPIC)
+
+        partition = TopicPartition(self.TOPIC, 0)
+
+        producer.start()
+
+        for num_started, node in enumerate(consumer.nodes, 1):
+            consumer.start_node(node)
+            self.await_members(consumer, num_started)
+            self.await_consumed_messages(consumer)
+
+        for num_stopped, node in enumerate(consumer.nodes, 1):
+            consumer.stop_node(node)
+
+            if num_stopped < self.num_consumers:
+                self.await_members(consumer, self.num_consumers - num_stopped)
+                self.await_consumed_messages(consumer)
+
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+        assert consumer.last_commit(partition) == consumer.current_position(partition), \
+            "Last committed offset did not match last consumed position"
+
+
+class AssignmentValidationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    def __init__(self, test_context):
+        super(AssignmentValidationTest, self).__init__(test_context, num_consumers=3, num_producers=0,
+                                                num_zk=1, num_brokers=2, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
+        })
+
+    @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+                                 "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
+    def test_valid_assignment(self, assignment_strategy):
+        """
+        Verify assignment strategy correctness: each partition is assigned to exactly
+        one consumer instance.
+
+        Setup: single Kafka cluster with a set of consumers in the same group.
+
+        - Start the consumers one by one
+        - Validate assignment after every expected rebalance
+        """
+        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy)
+        for num_started, node in enumerate(consumer.nodes, 1):
+            consumer.start_node(node)
+            self.await_members(consumer, num_started)
+            assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
+            

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
new file mode 100644
index 0000000..357fd17
--- /dev/null
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -0,0 +1,92 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.kafka import config_property
+import time
+
+
+class MessageFormatChangeTest(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.messages_per_producer = 100
+
+    def produce_and_consume(self, producer_version, consumer_version, group):
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic,
+                                           throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           version=KafkaVersion(producer_version))
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, consumer_timeout_ms=30000,
+                                        message_validator=is_int, version=KafkaVersion(consumer_version))
+        self.consumer.group_id = group
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+        
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
+    def test_compatibility(self, producer_version, consumer_version):
+        """ This tests performs the following checks:
+        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
+        that produce to and consume from a 0.10.x cluster
+        1. initially the topic is using message format 0.9.0
+        2. change the message format version for topic to 0.10.0 on the fly.
+        3. change the message format version for topic back to 0.9.0 on the fly.
+        - The producers and consumers should not have any issue.
+        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
+        """
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    'configs': {"min.insync.replicas": 2}}})
+       
+        self.kafka.start()
+        self.logger.info("First format change to 0.9.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+        self.produce_and_consume(producer_version, consumer_version, "group1")
+
+        self.logger.info("Second format change to 0.10.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
+        self.produce_and_consume(producer_version, consumer_version, "group2")
+
+        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
+            self.logger.info("Third format change back to 0.9.0")
+            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+            self.produce_and_consume(producer_version, consumer_version, "group3")
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
new file mode 100644
index 0000000..7c2ec59
--- /dev/null
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.performance import ProducerPerformanceService
+from kafkatest.services.console_consumer import ConsoleConsumer
+
+
+class QuotaTest(Test):
+    """
+    These tests verify that quota provides expected functionality -- they run
+    producer, broker, and consumer with different clientId and quota configuration and
+    check that the observed throughput is close to the value we expect.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(QuotaTest, self).__init__(test_context=test_context)
+
+        self.topic = 'test_topic'
+        self.logger.info('use topic ' + self.topic)
+
+        # quota related parameters
+        self.quota_config = {'quota_producer_default': 2500000,
+                             'quota_consumer_default': 2000000,
+                             'quota_producer_bytes_per_second_overrides': 'overridden_id=3750000',
+                             'quota_consumer_bytes_per_second_overrides': 'overridden_id=3000000'}
+        self.maximum_client_deviation_percentage = 100.0
+        self.maximum_broker_deviation_percentage = 5.0
+        self.num_records = 100000
+        self.record_size = 3000
+
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+                                  security_protocol='PLAINTEXT',
+                                  interbroker_security_protocol='PLAINTEXT',
+                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
+                                  quota_config=self.quota_config,
+                                  jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
+                                                    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
+                                  jmx_attributes=['OneMinuteRate'])
+        self.num_producers = 1
+        self.num_consumers = 2
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the constructor"""
+        return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
+    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
+    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
+    def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
+        # Produce all messages
+        producer = ProducerPerformanceService(
+            self.test_context, producer_num, self.kafka,
+            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_id,
+            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], jmx_attributes=['outgoing-byte-rate'])
+
+        producer.run()
+
+        # Consume all messages
+        consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
+            new_consumer=False,
+            consumer_timeout_ms=60000, client_id=consumer_id,
+            jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s' % consumer_id],
+            jmx_attributes=['OneMinuteRate'])
+        consumer.run()
+
+        for idx, messages in consumer.messages_consumed.iteritems():
+            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
+
+        success, msg = self.validate(self.kafka, producer, consumer)
+        assert success, msg
+
+    def validate(self, broker, producer, consumer):
+        """
+        For each client_id we validate that:
+        1) number of consumed messages equals number of produced messages
+        2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        """
+        success = True
+        msg = ''
+
+        self.kafka.read_jmx_output_all_nodes()
+
+        # validate that number of consumed messages equals number of produced messages
+        produced_num = sum([value['records'] for value in producer.results])
+        consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
+        self.logger.info('producer produced %d messages' % produced_num)
+        self.logger.info('consumer consumed %d messages' % consumed_num)
+        if produced_num != consumed_num:
+            success = False
+            msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
+
+        # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
+        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
+        producer_quota_bps = self.get_producer_quota(producer.client_id)
+        self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
+        if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+                   (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
+        broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
+        self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
+                         (broker_maximum_byte_in_bps, producer_quota_bps))
+        if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+                   (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        consumer_attribute_name = 'kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s:OneMinuteRate' % consumer.client_id
+        consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
+        consumer_quota_bps = self.get_consumer_quota(consumer.client_id)
+        self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
+        if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+                   (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
+        broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
+        self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
+                         (broker_maximum_byte_out_bps, consumer_quota_bps))
+        if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+                   (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        return success, msg
+
+    def get_producer_quota(self, client_id):
+        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_producer_bytes_per_second_overrides'].split(',')}
+        if client_id in overridden_quotas:
+            return float(overridden_quotas[client_id])
+        return self.quota_config['quota_producer_default']
+
+    def get_consumer_quota(self, client_id):
+        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_consumer_bytes_per_second_overrides'].split(',')}
+        if client_id in overridden_quotas:
+            return float(overridden_quotas[client_id])
+        return self.quota_config['quota_consumer_default']

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/compatibility_test_new_broker_test.py
deleted file mode 100644
index 2c261df..0000000
--- a/tests/kafkatest/tests/compatibility_test_new_broker_test.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.kafka import config_property
-
-# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
-class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.messages_per_producer = 1000
-
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=str("LogAppendTime"))
-    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False, timestamp_type=None):
-       
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}})
-        for node in self.kafka.nodes:
-            if timestamp_type is not None:
-                node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
-        self.kafka.start()
-         
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           compression_types=compression_types,
-                                           version=KafkaVersion(producer_version))
-
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
-                                        message_validator=is_int, version=KafkaVersion(consumer_version))
-
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compression_test.py b/tests/kafkatest/tests/compression_test.py
deleted file mode 100644
index 0de53ae..0000000
--- a/tests/kafkatest/tests/compression_test.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int_with_prefix
-
-class CompressionTest(ProduceConsumeValidateTest):
-    """
-    These tests validate produce / consume for compressed topics.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(CompressionTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 10,
-                                                                    "replication-factor": 1}})
-        self.num_partitions = 10
-        self.timeout_sec = 60
-        self.producer_throughput = 1000
-        self.num_producers = 4
-        self.messages_per_producer = 1000
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        # Override this since we're adding services outside of the constructor
-        return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
-    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
-    def test_compressed_topic(self, compression_types, new_consumer):
-        """Test produce => consume => validate for compressed topics
-        Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
-
-        compression_types parameter gives a list of compression types (or no compression if
-        "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
-        compression type from the list based on producer's index in the group.
-
-            - Produce messages in the background
-            - Consume messages in the background
-            - Stop producing, and finish consuming
-            - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = "PLAINTEXT"
-        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int_with_prefix,
-                                           compression_types=compression_types)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
-                                        new_consumer=new_consumer, consumer_timeout_ms=60000,
-                                        message_validator=is_int_with_prefix)
-        self.kafka.start()
-
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/__init__.py b/tests/kafkatest/tests/connect/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/tests/connect/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults