You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/11/29 17:11:58 UTC

[4/4] kafka git commit: Revert "KAFKA-4345; Run decktape test for each pull request"

Revert "KAFKA-4345; Run decktape test for each pull request"

This reverts commit e035fc039598127e88f31739458f705290b1fdba for the
following reasons:

1. License files are missing causing local builds to fail during the
rat task (rat is not being run in Jenkins for some reason, filed
KAFKA-4459 for that)
2. It renames a number of system test files when there's a better
way to achieve the goal of running a subset of system tests to stay
under the Travis limit.
3. It adds the gradle wrapper binary even though this was removed
intentionally a while back.

A new PR will be submitted for KAFKA-4345 without the undesired
changes.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2187 from ijuma/kafka-4345-revert


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5d28149
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5d28149
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5d28149

Branch: refs/heads/trunk
Commit: a5d28149fb67ab68d9de7d8de63c0de93b80a82c
Parents: 3e3b7a0
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Nov 29 09:11:21 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Nov 29 09:11:21 2016 -0800

----------------------------------------------------------------------
 .travis.yml                                     |  48 ---
 gradle/wrapper/gradle-wrapper.jar               | Bin 52928 -> 0 bytes
 gradle/wrapper/gradle-wrapper.properties        |   6 -
 tests/README.md                                 |  37 ---
 tests/cluster_file.json                         |  97 ------
 tests/kafkatest/directory_layout/kafka_path.py  |  10 +-
 tests/kafkatest/services/mirror_maker.py        |   4 +-
 tests/kafkatest/tests/client/__init__.py        |  14 +
 .../kafkatest/tests/client/compression_test.py  |  85 ++++++
 .../client/consumer_rolling_upgrade_test.py     |  82 +++++
 tests/kafkatest/tests/client/consumer_test.py   | 297 +++++++++++++++++++
 .../tests/client/message_format_change_test.py  |  90 ++++++
 tests/kafkatest/tests/client/quota_test.py      | 219 ++++++++++++++
 tests/kafkatest/tests/client1/__init__.py       |   0
 .../client1/consumer_rolling_upgrade_test.py    |  82 -----
 .../tests/client1/message_format_change_test.py |  90 ------
 tests/kafkatest/tests/client1/quota_test.py     | 219 --------------
 tests/kafkatest/tests/client2/__init__.py       |  14 -
 .../kafkatest/tests/client2/compression_test.py |  85 ------
 tests/kafkatest/tests/client2/consumer_test.py  | 297 -------------------
 tests/kafkatest/tests/core/__init__.py          |  14 +
 .../core/compatibility_test_new_broker_test.py  |  80 +++++
 .../tests/core/consumer_group_command_test.py   | 106 +++++++
 .../tests/core/get_offset_shell_test.py         |  91 ++++++
 tests/kafkatest/tests/core/mirror_maker_test.py | 179 +++++++++++
 .../tests/core/reassign_partitions_test.py      | 110 +++++++
 tests/kafkatest/tests/core/replication_test.py  | 154 ++++++++++
 .../tests/core/security_rolling_upgrade_test.py | 190 ++++++++++++
 tests/kafkatest/tests/core/security_test.py     | 106 +++++++
 .../tests/core/simple_consumer_shell_test.py    |  75 +++++
 tests/kafkatest/tests/core/throttling_test.py   | 173 +++++++++++
 tests/kafkatest/tests/core/upgrade_test.py      | 128 ++++++++
 .../core/zookeeper_security_upgrade_test.py     | 115 +++++++
 tests/kafkatest/tests/core1/__init__.py         |   0
 .../tests/core1/consumer_group_command_test.py  | 106 -------
 .../tests/core1/get_offset_shell_test.py        |  91 ------
 .../tests/core1/reassign_partitions_test.py     | 110 -------
 .../tests/core1/simple_consumer_shell_test.py   |  75 -----
 tests/kafkatest/tests/core1/throttling_test.py  | 173 -----------
 tests/kafkatest/tests/core2/__init__.py         |  14 -
 .../core2/compatibility_test_new_broker_test.py |  80 -----
 tests/kafkatest/tests/mirror_maker/__init__.py  |   0
 .../tests/mirror_maker/mirror_maker_test.py     | 179 -----------
 .../kafkatest/tests/produce_consume_validate.py |   3 +-
 tests/kafkatest/tests/replication/__init__.py   |   0
 .../tests/replication/replication_test.py       | 154 ----------
 tests/kafkatest/tests/security1/__init__.py     |   0
 .../kafkatest/tests/security1/security_test.py  | 106 -------
 .../zookeeper_security_upgrade_test.py          | 115 -------
 tests/kafkatest/tests/security2/__init__.py     |   0
 .../security2/security_rolling_upgrade_test.py  | 190 ------------
 tests/kafkatest/tests/upgrade/__init__.py       |   0
 tests/kafkatest/tests/upgrade/upgrade_test.py   | 128 --------
 tests/travis/Dockerfile                         |  38 ---
 tests/travis/run_tests.sh                       |  58 ----
 tests/travis/ssh/authorized_keys                |  15 -
 tests/travis/ssh/config                         |  21 --
 tests/travis/ssh/id_rsa                         |  27 --
 tests/travis/ssh/id_rsa.pub                     |   1 -
 59 files changed, 2316 insertions(+), 2665 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index fa3ab0b..0000000
--- a/.travis.yml
+++ /dev/null
@@ -1,48 +0,0 @@
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-sudo: required
-dist: trusty
-language: java
-
-# TODO enable failing splits after they have been stablized
-env:
-  - TC_PATHS="tests/kafkatest/tests/client1"
-  - TC_PATHS="tests/kafkatest/tests/client2"
-# - TC_PATHS="tests/kafkatest/tests/connect tests/kafkatest/tests/streams tests/kafkatest/tests/tools"
-# - TC_PATHS="tests/kafkatest/tests/mirror_maker"
-# - TC_PATHS="tests/kafkatest/tests/replication"
-# - TC_PATHS="tests/kafkatest/tests/upgrade"
-  - TC_PATHS="tests/kafkatest/tests/security1"
-# - TC_PATHS="tests/kafkatest/tests/security2"
-# - TC_PATHS="tests/kafkatest/tests/core1"
-  - TC_PATHS="tests/kafkatest/tests/core2"
-
-jdk:
-  - oraclejdk8
-
-before_install:
-
-script:
-  - ./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh
-
-services:
-  - docker
-
-before_cache:
-  - rm -f  $HOME/.gradle/caches/modules-2/modules-2.lock
-  - rm -fr $HOME/.gradle/caches/*/plugin-resolution/
-cache:
-  directories:
-    - "$HOME/.m2/repository"
-    - "$HOME/.gradle/caches/"
-    - "$HOME/.gradle/wrapper/"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/gradle/wrapper/gradle-wrapper.jar
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
deleted file mode 100644
index 6ffa237..0000000
Binary files a/gradle/wrapper/gradle-wrapper.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
deleted file mode 100644
index cde46f5..0000000
--- a/gradle/wrapper/gradle-wrapper.properties
+++ /dev/null
@@ -1,6 +0,0 @@
-#Fri Oct 07 16:09:33 PDT 2016
-distributionBase=GRADLE_USER_HOME
-distributionPath=wrapper/dists
-zipStoreBase=GRADLE_USER_HOME
-zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-3.0-bin.zip

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/README.md
----------------------------------------------------------------------
diff --git a/tests/README.md b/tests/README.md
index 9056b99..098922f 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -6,43 +6,6 @@ This directory contains Kafka system integration and performance tests.
 (ducktape is a distributed testing framework which provides test runner,
 result reporter and utilities to pull up and tear down services.)
 
-Running tests using docker
---------------------------
-Docker is used for running kafka system tests on travis-ci. And exactly same setup can be run for development purposes.
-
-* Run all tests
-```
-bash tests/travis/run_tests.sh
-```
-* Run all tests with debug on (warning will produce log of logs)
-```
-_DUCKTAPE_OPTIONS="--debug" bash tests/travis/run_tests.sh | tee debug_logs.txt
-```
-* Run a subset of tests
-```
-TC_PATHS="tests/kafkatest/tests/streams tests/kafkatest/tests/tools" bash tests/travis/run_tests.sh
-```
-
-Examining CI run
-----------------
-* Set BUILD_ID is travis ci's build id. E.g. build id is 169519874 for the following build
-```
-https://travis-ci.org/raghavgautam/kafka/builds/169519874
-```
-
-* Getting number of tests that were actually run
-```
-for id in $(curl -sSL https://api.travis-ci.org/builds/$BUILD_ID | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done | egrep 'SerialTestRunner.*setting up' | wc
-```
-* Getting number of tests that passed
-```
-for id in $(curl -sSL https://api.travis-ci.org/builds/$BUILD_ID | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done | egrep 'SerialTestRunner.*PASS' | wc
-```
-* Getting all the logs produced from a run
-```
-for id in $(curl -sSL https://api.travis-ci.org/builds/169519874 | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done
-```
-
 Local Quickstart
 ----------------
 This quickstart will help you run the Kafka system tests on your local machine. Note this requires bringing up a cluster of virtual machines on your local computer, which is memory intensive; it currently requires around 10G RAM.

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/cluster_file.json
----------------------------------------------------------------------
diff --git a/tests/cluster_file.json b/tests/cluster_file.json
deleted file mode 100644
index dbf3f04..0000000
--- a/tests/cluster_file.json
+++ /dev/null
@@ -1,97 +0,0 @@
-{
-  "_comment": [
-    "Licensed to the Apache Software Foundation (ASF) under one or more",
-    "contributor license agreements.  See the NOTICE file distributed with",
-    "this work for additional information regarding copyright ownership.",
-    "The ASF licenses this file to You under the Apache License, Version 2.0",
-    "(the \"License\"); you may not use this file except in compliance with",
-    "the License.  You may obtain a copy of the License at",
-    "",
-    "http://www.apache.org/licenses/LICENSE-2.0",
-    "",
-    "Unless required by applicable law or agreed to in writing, software",
-    "distributed under the License is distributed on an \"AS IS\" BASIS,",
-    "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.",
-    "See the License for the specific language governing permissions and",
-    "limitations under the License."
-  ],
-  "nodes": [
-    {
-      "hostname": "knode02.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode02.knw"
-    },
-    {
-      "hostname": "knode03.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode03.knw"
-    },
-    {
-      "hostname": "knode04.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode04.knw"
-    },
-    {
-      "hostname": "knode05.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode05.knw"
-    },
-    {
-      "hostname": "knode06.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode06.knw"
-    },
-    {
-      "hostname": "knode07.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode07.knw"
-    },
-    {
-      "hostname": "knode08.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode08.knw"
-    },
-    {
-      "hostname": "knode09.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode09.knw"
-    },
-    {
-      "hostname": "knode10.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode10.knw"
-    },
-    {
-      "hostname": "knode11.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode11.knw"
-    },
-    {
-      "hostname": "knode12.knw",
-      "user": "root",
-      "ssh_args": "",
-      "ssh_hostname": "",
-      "externally_routable_ip": "knode12.knw"
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/directory_layout/kafka_path.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/directory_layout/kafka_path.py b/tests/kafkatest/directory_layout/kafka_path.py
index 9688174..0e60aff 100644
--- a/tests/kafkatest/directory_layout/kafka_path.py
+++ b/tests/kafkatest/directory_layout/kafka_path.py
@@ -44,11 +44,11 @@ TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME = "tools-dependant-libs"
 
 JARS = {
     "trunk": {
-        CORE_JAR_NAME: "libs/*.jar",
-        CORE_LIBS_JAR_NAME: "libs/*.jar",
-        CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/*.jar",
-        TOOLS_JAR_NAME: "libs/*.jar",
-        TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/*.jar"
+        CORE_JAR_NAME: "core/build/*/*.jar",
+        CORE_LIBS_JAR_NAME: "core/build/libs/*.jar",
+        CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "core/build/dependant-testlibs/*.jar",
+        TOOLS_JAR_NAME: "tools/build/libs/kafka-tools*.jar",
+        TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "tools/build/dependant-libs*/*.jar"
     }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 626a0ff..14af4cf 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -180,13 +180,13 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
         cmd = self.start_cmd(node)
         self.logger.debug("Mirror maker command: %s", cmd)
         node.account.ssh(cmd, allow_fail=False)
-        wait_until(lambda: self.alive(node), timeout_sec=30, backoff_sec=.5,
+        wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5,
                    err_msg="Mirror maker took to long to start.")
         self.logger.debug("Mirror maker is alive")
 
     def stop_node(self, node, clean_shutdown=True):
         node.account.kill_process("java", allow_fail=True, clean_shutdown=clean_shutdown)
-        wait_until(lambda: not self.alive(node), timeout_sec=30, backoff_sec=.5,
+        wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.5,
                    err_msg="Mirror maker took to long to stop.")
 
     def clean_node(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/__init__.py b/tests/kafkatest/tests/client/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/tests/client/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
new file mode 100644
index 0000000..0de53ae
--- /dev/null
+++ b/tests/kafkatest/tests/client/compression_test.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int_with_prefix
+
+class CompressionTest(ProduceConsumeValidateTest):
+    """
+    These tests validate produce / consume for compressed topics.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(CompressionTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 10,
+                                                                    "replication-factor": 1}})
+        self.num_partitions = 10
+        self.timeout_sec = 60
+        self.producer_throughput = 1000
+        self.num_producers = 4
+        self.messages_per_producer = 1000
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
+    def test_compressed_topic(self, compression_types, new_consumer):
+        """Test produce => consume => validate for compressed topics
+        Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
+
+        compression_types parameter gives a list of compression types (or no compression if
+        "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
+        compression type from the list based on producer's index in the group.
+
+            - Produce messages in the background
+            - Consume messages in the background
+            - Stop producing, and finish consuming
+            - Validate that every acked message was consumed
+        """
+
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int_with_prefix,
+                                           compression_types=compression_types)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
+                                        new_consumer=new_consumer, consumer_timeout_ms=60000,
+                                        message_validator=is_int_with_prefix)
+        self.kafka.start()
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
new file mode 100644
index 0000000..3cd3c7c
--- /dev/null
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 4
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+    def __init__(self, test_context):
+        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
+                                                         num_zk=1, num_brokers=1, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+        })
+
+    def _verify_range_assignment(self, consumer):
+        # range assignment should give us two partition sets: (0, 1) and (2, 3)
+        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+    def _verify_roundrobin_assignment(self, consumer):
+        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+    def rolling_update_test(self):
+        """
+        Verify rolling updates of partition assignment strategies works correctly. In this
+        test, we use a rolling restart to change the group's assignment strategy from "range" 
+        to "roundrobin." We verify after every restart that all members are still in the group
+        and that the correct assignment strategy was used.
+        """
+
+        # initialize the consumer using range assignment
+        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
+
+        consumer.start()
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+
+        # change consumer configuration to prefer round-robin assignment, but still support range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+        # restart one of the nodes and verify that we are still using range assignment
+        consumer.stop_node(consumer.nodes[0])
+        consumer.start_node(consumer.nodes[0])
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+        
+        # now restart the other node and verify that we have switched to round-robin
+        consumer.stop_node(consumer.nodes[1])
+        consumer.start_node(consumer.nodes[1])
+        self.await_all_members(consumer)
+        self._verify_roundrobin_assignment(consumer)
+
+        # if we want, we can now drop support for range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN
+        for node in consumer.nodes:
+            consumer.stop_node(node)
+            consumer.start_node(node)
+            self.await_all_members(consumer)
+            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
new file mode 100644
index 0000000..534f65c
--- /dev/null
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+import signal
+
+class OffsetValidationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 1
+
+    def __init__(self, test_context):
+        super(OffsetValidationTest, self).__init__(test_context, num_consumers=3, num_producers=1,
+                                                     num_zk=1, num_brokers=2, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
+        })
+
+    def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in consumer.nodes:
+                consumer.stop_node(node, clean_shutdown)
+
+                wait_until(lambda: len(consumer.dead_nodes()) == 1,
+                           timeout_sec=self.session_timeout_sec+5,
+                           err_msg="Timed out waiting for the consumer to shutdown")
+
+                consumer.start_node(node)
+
+                self.await_all_members(consumer)
+                self.await_consumed_messages(consumer)
+
+    def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in consumer.nodes:
+                consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+                       err_msg="Timed out waiting for the consumers to shutdown")
+            
+            for node in consumer.nodes:
+                consumer.start_node(node)
+
+            self.await_all_members(consumer)
+            self.await_consumed_messages(consumer)
+
+    def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in self.kafka.nodes:
+                self.kafka.restart_node(node, clean_shutdown=True)
+                self.await_all_members(consumer)
+                self.await_consumed_messages(consumer)
+
+    def setup_consumer(self, topic, **kwargs):
+        # collect verifiable consumer events since this makes debugging much easier
+        consumer = super(OffsetValidationTest, self).setup_consumer(topic, **kwargs)
+        self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
+        return consumer
+
+    def test_broker_rolling_bounce(self):
+        """
+        Verify correct consumer behavior when the brokers are consecutively restarted.
+
+        Setup: single Kafka cluster with one producer writing messages to a single topic with one
+        partition, an a set of consumers in the same group reading from the same topic.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers and wait until they've joined the group.
+        - In a loop, restart each broker consecutively, waiting for the group to stabilize between
+          each broker restart.
+        - Verify delivery semantics according to the failure type and that the broker bounces
+          did not cause unexpected group rebalances.
+        """
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        producer = self.setup_producer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC)
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        num_rebalances = consumer.num_rebalances()
+        # TODO: make this test work with hard shutdowns, which probably requires
+        #       pausing before the node is restarted to ensure that any ephemeral
+        #       nodes have time to expire
+        self.rolling_bounce_brokers(consumer, clean_shutdown=True)
+        
+        unexpected_rebalances = consumer.num_rebalances() - num_rebalances
+        assert unexpected_rebalances == 0, \
+            "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
+
+        consumer.stop_all()
+
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+    @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
+    def test_consumer_bounce(self, clean_shutdown, bounce_mode):
+        """
+        Verify correct consumer behavior when the consumers in the group are consecutively restarted.
+
+        Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers and wait until they've joined the group.
+        - In a loop, restart each consumer, waiting for each one to rejoin the group before
+          restarting the rest.
+        - Verify delivery semantics according to the failure type.
+        """
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        producer = self.setup_producer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC)
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        if bounce_mode == "all":
+            self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
+        else:
+            self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
+                
+        consumer.stop_all()
+        if clean_shutdown:
+            # if the total records consumed matches the current position, we haven't seen any duplicates
+            # this can only be guaranteed with a clean shutdown
+            assert consumer.current_position(partition) == consumer.total_consumed(), \
+                "Total consumed records did not match consumed position"
+        else:
+            # we may have duplicates in a hard failure
+            assert consumer.current_position(partition) <= consumer.total_consumed(), \
+                "Current position greater than the total number of consumed records"
+
+    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+    def test_consumer_failure(self, clean_shutdown, enable_autocommit):
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+        producer = self.setup_producer(self.TOPIC)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        partition_owner = consumer.owner(partition)
+        assert partition_owner is not None
+
+        # startup the producer and ensure that some records have been written
+        producer.start()
+        self.await_produced_messages(producer)
+
+        # stop the partition owner and await its shutdown
+        consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
+        wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
+                   timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for consumer to close")
+
+        # ensure that the remaining consumer does some work after rebalancing
+        self.await_consumed_messages(consumer, min_messages=1000)
+
+        consumer.stop_all()
+
+        if clean_shutdown:
+            # if the total records consumed matches the current position, we haven't seen any duplicates
+            # this can only be guaranteed with a clean shutdown
+            assert consumer.current_position(partition) == consumer.total_consumed(), \
+                "Total consumed records did not match consumed position"
+        else:
+            # we may have duplicates in a hard failure
+            assert consumer.current_position(partition) <= consumer.total_consumed(), \
+                "Current position greater than the total number of consumed records"
+
+        # if autocommit is not turned on, we can also verify the last committed offset
+        if not enable_autocommit:
+            assert consumer.last_commit(partition) == consumer.current_position(partition), \
+                "Last committed offset did not match last consumed position"
+
+
+    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+    def test_broker_failure(self, clean_shutdown, enable_autocommit):
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+        producer = self.setup_producer(self.TOPIC)
+
+        producer.start()
+        consumer.start()
+        self.await_all_members(consumer)
+
+        num_rebalances = consumer.num_rebalances()
+
+        # shutdown one of the brokers
+        # TODO: we need a way to target the coordinator instead of picking arbitrarily
+        self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
+
+        # ensure that the consumers do some work after the broker failure
+        self.await_consumed_messages(consumer, min_messages=1000)
+
+        # verify that there were no rebalances on failover
+        assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
+
+        consumer.stop_all()
+
+        # if the total records consumed matches the current position, we haven't seen any duplicates
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+        # if autocommit is not turned on, we can also verify the last committed offset
+        if not enable_autocommit:
+            assert consumer.last_commit(partition) == consumer.current_position(partition), \
+                "Last committed offset did not match last consumed position"
+
+    def test_group_consumption(self):
+        """
+        Verifies correct group rebalance behavior as consumers are started and stopped. 
+        In particular, this test verifies that the partition is readable after every
+        expected rebalance.
+
+        Setup: single Kafka cluster with a group of consumers reading from one topic
+        with one partition while the verifiable producer writes to it.
+
+        - Start the consumers one by one, verifying consumption after each rebalance
+        - Shutdown the consumers one by one, verifying consumption after each rebalance
+        """
+        consumer = self.setup_consumer(self.TOPIC)
+        producer = self.setup_producer(self.TOPIC)
+
+        partition = TopicPartition(self.TOPIC, 0)
+
+        producer.start()
+
+        for num_started, node in enumerate(consumer.nodes, 1):
+            consumer.start_node(node)
+            self.await_members(consumer, num_started)
+            self.await_consumed_messages(consumer)
+
+        for num_stopped, node in enumerate(consumer.nodes, 1):
+            consumer.stop_node(node)
+
+            if num_stopped < self.num_consumers:
+                self.await_members(consumer, self.num_consumers - num_stopped)
+                self.await_consumed_messages(consumer)
+
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+        assert consumer.last_commit(partition) == consumer.current_position(partition), \
+            "Last committed offset did not match last consumed position"
+
+
+class AssignmentValidationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    def __init__(self, test_context):
+        super(AssignmentValidationTest, self).__init__(test_context, num_consumers=3, num_producers=0,
+                                                num_zk=1, num_brokers=2, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
+        })
+
+    @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+                                 "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
+    def test_valid_assignment(self, assignment_strategy):
+        """
+        Verify assignment strategy correctness: each partition is assigned to exactly
+        one consumer instance.
+
+        Setup: single Kafka cluster with a set of consumers in the same group.
+
+        - Start the consumers one by one
+        - Validate assignment after every expected rebalance
+        """
+        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy)
+        for num_started, node in enumerate(consumer.nodes, 1):
+            consumer.start_node(node)
+            self.await_members(consumer, num_started)
+            assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
+            

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
new file mode 100644
index 0000000..a57c04b
--- /dev/null
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -0,0 +1,90 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
+
+
+class MessageFormatChangeTest(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.messages_per_producer = 100
+
+    def produce_and_consume(self, producer_version, consumer_version, group):
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic,
+                                           throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           version=KafkaVersion(producer_version))
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, new_consumer=False, consumer_timeout_ms=30000,
+                                        message_validator=is_int, version=KafkaVersion(consumer_version))
+        self.consumer.group_id = group
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+        
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
+    def test_compatibility(self, producer_version, consumer_version):
+        """ This tests performs the following checks:
+        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
+        that produce to and consume from a 0.10.x cluster
+        1. initially the topic is using message format 0.9.0
+        2. change the message format version for topic to 0.10.0 on the fly.
+        3. change the message format version for topic back to 0.9.0 on the fly.
+        - The producers and consumers should not have any issue.
+        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
+        """
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    'configs': {"min.insync.replicas": 2}}})
+       
+        self.kafka.start()
+        self.logger.info("First format change to 0.9.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+        self.produce_and_consume(producer_version, consumer_version, "group1")
+
+        self.logger.info("Second format change to 0.10.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
+        self.produce_and_consume(producer_version, consumer_version, "group2")
+
+        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
+            self.logger.info("Third format change back to 0.9.0")
+            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+            self.produce_and_consume(producer_version, consumer_version, "group3")
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
new file mode 100644
index 0000000..1d31569
--- /dev/null
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import matrix, parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.performance import ProducerPerformanceService
+from kafkatest.services.console_consumer import ConsoleConsumer
+
+class QuotaConfig(object):
+    CLIENT_ID = 'client-id'
+    USER = 'user'
+    USER_CLIENT = '(user, client-id)'
+
+    LARGE_QUOTA = 1000 * 1000 * 1000
+    USER_PRINCIPAL = 'CN=systemtest'
+
+    def __init__(self, quota_type, override_quota, kafka):
+        if quota_type == QuotaConfig.CLIENT_ID:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id'])
+        elif quota_type == QuotaConfig.USER:
+            if override_quota:
+                self.client_id = 'some_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+            else:
+                self.client_id = 'some_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+        elif quota_type == QuotaConfig.USER_CLIENT:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+
+    def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
+        node = kafka.nodes[0]
+        cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
+              (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate)
+        cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
+        if len(entity_args) > 2:
+            cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
+        node.account.ssh(cmd)
+
+    def entity_name_opt(self, name):
+        return " --entity-default" if name is None else " --entity-name " + name
+
+class QuotaTest(Test):
+    """
+    These tests verify that quota provides expected functionality -- they run
+    producer, broker, and consumer with different clientId and quota configuration and
+    check that the observed throughput is close to the value we expect.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(QuotaTest, self).__init__(test_context=test_context)
+
+        self.topic = 'test_topic'
+        self.logger.info('use topic ' + self.topic)
+
+        self.maximum_client_deviation_percentage = 100.0
+        self.maximum_broker_deviation_percentage = 5.0
+        self.num_records = 50000
+        self.record_size = 3000
+
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+                                  security_protocol='SSL', authorizer_class_name='',
+                                  interbroker_security_protocol='SSL',
+                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
+                                  jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
+                                                    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
+                                  jmx_attributes=['OneMinuteRate'])
+        self.num_producers = 1
+        self.num_consumers = 2
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the constructor"""
+        return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
+    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
+        self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
+        producer_client_id = self.quota_config.client_id
+        consumer_client_id = self.quota_config.client_id
+
+        # Produce all messages
+        producer = ProducerPerformanceService(
+            self.test_context, producer_num, self.kafka,
+            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
+            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
+
+        producer.run()
+
+        # Consume all messages
+        consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
+            consumer_timeout_ms=60000, client_id=consumer_client_id,
+            jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
+            jmx_attributes=['bytes-consumed-rate'])
+        consumer.run()
+
+        for idx, messages in consumer.messages_consumed.iteritems():
+            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
+
+        success, msg = self.validate(self.kafka, producer, consumer)
+        assert success, msg
+
+    def validate(self, broker, producer, consumer):
+        """
+        For each client_id we validate that:
+        1) number of consumed messages equals number of produced messages
+        2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        """
+        success = True
+        msg = ''
+
+        self.kafka.read_jmx_output_all_nodes()
+
+        # validate that number of consumed messages equals number of produced messages
+        produced_num = sum([value['records'] for value in producer.results])
+        consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
+        self.logger.info('producer produced %d messages' % produced_num)
+        self.logger.info('consumer consumed %d messages' % consumed_num)
+        if produced_num != consumed_num:
+            success = False
+            msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
+
+        # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
+        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
+        producer_quota_bps = self.quota_config.producer_quota
+        self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
+        if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+                   (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
+        broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
+        self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
+                         (broker_maximum_byte_in_bps, producer_quota_bps))
+        if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+                   (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
+        consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
+        consumer_quota_bps = self.quota_config.consumer_quota
+        self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
+        if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+                   (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
+        broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
+        self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
+                         (broker_maximum_byte_out_bps, consumer_quota_bps))
+        if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+                   (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        return success, msg
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client1/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/__init__.py b/tests/kafkatest/tests/client1/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
deleted file mode 100644
index 3cd3c7c..0000000
--- a/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 4
-    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
-    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
-
-    def __init__(self, test_context):
-        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
-                                                         num_zk=1, num_brokers=1, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
-        })
-
-    def _verify_range_assignment(self, consumer):
-        # range assignment should give us two partition sets: (0, 1) and (2, 3)
-        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
-            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
-
-    def _verify_roundrobin_assignment(self, consumer):
-        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
-            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
-
-    def rolling_update_test(self):
-        """
-        Verify rolling updates of partition assignment strategies works correctly. In this
-        test, we use a rolling restart to change the group's assignment strategy from "range" 
-        to "roundrobin." We verify after every restart that all members are still in the group
-        and that the correct assignment strategy was used.
-        """
-
-        # initialize the consumer using range assignment
-        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
-
-        consumer.start()
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-
-        # change consumer configuration to prefer round-robin assignment, but still support range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
-
-        # restart one of the nodes and verify that we are still using range assignment
-        consumer.stop_node(consumer.nodes[0])
-        consumer.start_node(consumer.nodes[0])
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-        
-        # now restart the other node and verify that we have switched to round-robin
-        consumer.stop_node(consumer.nodes[1])
-        consumer.start_node(consumer.nodes[1])
-        self.await_all_members(consumer)
-        self._verify_roundrobin_assignment(consumer)
-
-        # if we want, we can now drop support for range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN
-        for node in consumer.nodes:
-            consumer.stop_node(node)
-            consumer.start_node(node)
-            self.await_all_members(consumer)
-            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client1/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/message_format_change_test.py b/tests/kafkatest/tests/client1/message_format_change_test.py
deleted file mode 100644
index a57c04b..0000000
--- a/tests/kafkatest/tests/client1/message_format_change_test.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
-
-
-class MessageFormatChangeTest(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.messages_per_producer = 100
-
-    def produce_and_consume(self, producer_version, consumer_version, group):
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic,
-                                           throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           version=KafkaVersion(producer_version))
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, new_consumer=False, consumer_timeout_ms=30000,
-                                        message_validator=is_int, version=KafkaVersion(consumer_version))
-        self.consumer.group_id = group
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))
-        
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
-    def test_compatibility(self, producer_version, consumer_version):
-        """ This tests performs the following checks:
-        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
-        that produce to and consume from a 0.10.x cluster
-        1. initially the topic is using message format 0.9.0
-        2. change the message format version for topic to 0.10.0 on the fly.
-        3. change the message format version for topic back to 0.9.0 on the fly.
-        - The producers and consumers should not have any issue.
-        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
-        """
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}})
-       
-        self.kafka.start()
-        self.logger.info("First format change to 0.9.0")
-        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
-        self.produce_and_consume(producer_version, consumer_version, "group1")
-
-        self.logger.info("Second format change to 0.10.0")
-        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
-        self.produce_and_consume(producer_version, consumer_version, "group2")
-
-        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
-            self.logger.info("Third format change back to 0.9.0")
-            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
-            self.produce_and_consume(producer_version, consumer_version, "group3")
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client1/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/quota_test.py b/tests/kafkatest/tests/client1/quota_test.py
deleted file mode 100644
index 1d31569..0000000
--- a/tests/kafkatest/tests/client1/quota_test.py
+++ /dev/null
@@ -1,219 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import matrix, parametrize
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.console_consumer import ConsoleConsumer
-
-class QuotaConfig(object):
-    CLIENT_ID = 'client-id'
-    USER = 'user'
-    USER_CLIENT = '(user, client-id)'
-
-    LARGE_QUOTA = 1000 * 1000 * 1000
-    USER_PRINCIPAL = 'CN=systemtest'
-
-    def __init__(self, quota_type, override_quota, kafka):
-        if quota_type == QuotaConfig.CLIENT_ID:
-            if override_quota:
-                self.client_id = 'overridden_id'
-                self.producer_quota = 3750000
-                self.consumer_quota = 3000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', self.client_id])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-            else:
-                self.client_id = 'default_id'
-                self.producer_quota = 2500000
-                self.consumer_quota = 2000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id'])
-        elif quota_type == QuotaConfig.USER:
-            if override_quota:
-                self.client_id = 'some_id'
-                self.producer_quota = 3750000
-                self.consumer_quota = 3000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
-            else:
-                self.client_id = 'some_id'
-                self.producer_quota = 2500000
-                self.consumer_quota = 2000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-        elif quota_type == QuotaConfig.USER_CLIENT:
-            if override_quota:
-                self.client_id = 'overridden_id'
-                self.producer_quota = 3750000
-                self.consumer_quota = 3000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
-            else:
-                self.client_id = 'default_id'
-                self.producer_quota = 2500000
-                self.consumer_quota = 2000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None, 'clients', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-
-    def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
-        node = kafka.nodes[0]
-        cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
-              (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate)
-        cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
-        if len(entity_args) > 2:
-            cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
-        node.account.ssh(cmd)
-
-    def entity_name_opt(self, name):
-        return " --entity-default" if name is None else " --entity-name " + name
-
-class QuotaTest(Test):
-    """
-    These tests verify that quota provides expected functionality -- they run
-    producer, broker, and consumer with different clientId and quota configuration and
-    check that the observed throughput is close to the value we expect.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(QuotaTest, self).__init__(test_context=test_context)
-
-        self.topic = 'test_topic'
-        self.logger.info('use topic ' + self.topic)
-
-        self.maximum_client_deviation_percentage = 100.0
-        self.maximum_broker_deviation_percentage = 5.0
-        self.num_records = 50000
-        self.record_size = 3000
-
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
-                                  security_protocol='SSL', authorizer_class_name='',
-                                  interbroker_security_protocol='SSL',
-                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
-                                  jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
-                                                    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
-                                  jmx_attributes=['OneMinuteRate'])
-        self.num_producers = 1
-        self.num_consumers = 2
-
-    def setUp(self):
-        self.zk.start()
-        self.kafka.start()
-
-    def min_cluster_size(self):
-        """Override this since we're adding services outside of the constructor"""
-        return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
-    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
-    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
-        self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
-        producer_client_id = self.quota_config.client_id
-        consumer_client_id = self.quota_config.client_id
-
-        # Produce all messages
-        producer = ProducerPerformanceService(
-            self.test_context, producer_num, self.kafka,
-            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
-            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
-
-        producer.run()
-
-        # Consume all messages
-        consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
-            consumer_timeout_ms=60000, client_id=consumer_client_id,
-            jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
-            jmx_attributes=['bytes-consumed-rate'])
-        consumer.run()
-
-        for idx, messages in consumer.messages_consumed.iteritems():
-            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
-
-        success, msg = self.validate(self.kafka, producer, consumer)
-        assert success, msg
-
-    def validate(self, broker, producer, consumer):
-        """
-        For each client_id we validate that:
-        1) number of consumed messages equals number of produced messages
-        2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
-        3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
-        4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
-        5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
-        """
-        success = True
-        msg = ''
-
-        self.kafka.read_jmx_output_all_nodes()
-
-        # validate that number of consumed messages equals number of produced messages
-        produced_num = sum([value['records'] for value in producer.results])
-        consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
-        self.logger.info('producer produced %d messages' % produced_num)
-        self.logger.info('consumer consumed %d messages' % consumed_num)
-        if produced_num != consumed_num:
-            success = False
-            msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
-
-        # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
-        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
-        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
-        producer_quota_bps = self.quota_config.producer_quota
-        self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
-        if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
-                   (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
-
-        # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
-        broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
-        broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
-        self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
-                         (broker_maximum_byte_in_bps, producer_quota_bps))
-        if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
-                   (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
-
-        # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
-        consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
-        consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
-        consumer_quota_bps = self.quota_config.consumer_quota
-        self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
-        if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
-                   (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
-
-        # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
-        broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
-        broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
-        self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
-                         (broker_maximum_byte_out_bps, consumer_quota_bps))
-        if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
-                   (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
-
-        return success, msg
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client2/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client2/__init__.py b/tests/kafkatest/tests/client2/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/tests/kafkatest/tests/client2/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.