You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/11/24 04:49:03 UTC

[1/4] kafka git commit: KAFKA-4345; Run decktape test for each pull request

Repository: kafka
Updated Branches:
  refs/heads/trunk 724cddbc5 -> e035fc039


http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/security2/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security2/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security2/security_rolling_upgrade_test.py
new file mode 100644
index 0000000..51b2e60
--- /dev/null
+++ b/tests/kafkatest/tests/security2/security_rolling_upgrade_test.py
@@ -0,0 +1,190 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from ducktape.mark import parametrize
+from ducktape.mark import matrix
+from kafkatest.services.security.kafka_acls import ACLs
+import time
+
+
+class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
+    """Tests a rolling upgrade from PLAINTEXT to a secured cluster
+    """
+
+    def __init__(self, test_context):
+        super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.acls = ACLs(self.test_context)
+        self.topic = "test_topic"
+        self.group = "group"
+        self.producer_throughput = 100
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+            "partitions": 3,
+            "replication-factor": 3,
+            'configs': {"min.insync.replicas": 2}}})
+        self.zk.start()
+
+    def create_producer_and_consumer(self):
+        self.producer = VerifiableProducer(
+            self.test_context, self.num_producers, self.kafka, self.topic,
+            throughput=self.producer_throughput)
+
+        self.consumer = ConsoleConsumer(
+            self.test_context, self.num_consumers, self.kafka, self.topic,
+            consumer_timeout_ms=60000, message_validator=is_int)
+
+        self.consumer.group_id = "group"
+
+    def bounce(self):
+        self.kafka.start_minikdc()
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            self.kafka.start_node(node)
+            time.sleep(10)
+
+    def roll_in_secured_settings(self, client_protocol, broker_protocol):
+
+        # Roll cluster to include inter broker security protocol.
+        self.kafka.interbroker_security_protocol = broker_protocol
+        self.kafka.open_port(client_protocol)
+        self.kafka.open_port(broker_protocol)
+        self.bounce()
+
+        # Roll cluster to disable PLAINTEXT port
+        self.kafka.close_port('PLAINTEXT')
+        self.set_authorizer_and_bounce(client_protocol, broker_protocol)
+
+    def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
+        self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
+        self.acls.set_acls(client_protocol, self.kafka, self.zk, self.topic, self.group)
+        self.acls.set_acls(broker_protocol, self.kafka, self.zk, self.topic, self.group)
+        self.bounce()
+
+    def open_secured_port(self, client_protocol):
+        self.kafka.security_protocol = client_protocol
+        self.kafka.open_port(client_protocol)
+        self.kafka.start_minikdc()
+        self.bounce()
+
+    def add_sasl_mechanism(self, new_client_sasl_mechanism):
+        self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
+        self.kafka.start_minikdc()
+        self.bounce()
+
+    def roll_in_sasl_mechanism(self, security_protocol, new_sasl_mechanism):
+        # Roll cluster to update inter-broker SASL mechanism. This disables the old mechanism.
+        self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
+        self.bounce()
+
+        # Bounce again with ACLs for new mechanism
+        self.set_authorizer_and_bounce(security_protocol, security_protocol)
+
+    @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+    def test_rolling_upgrade_phase_one(self, client_protocol):
+        """
+        Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
+        and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
+        """
+        self.kafka.interbroker_security_protocol = "PLAINTEXT"
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.start()
+
+        # Create PLAINTEXT producer and consumer
+        self.create_producer_and_consumer()
+
+        # Rolling upgrade, opening a secure protocol, ensuring the Plaintext producer/consumer continues to run
+        self.run_produce_consume_validate(self.open_secured_port, client_protocol)
+
+        # Now we can produce and consume via the secured port
+        self.kafka.security_protocol = client_protocol
+        self.create_producer_and_consumer()
+        self.run_produce_consume_validate(lambda: time.sleep(1))
+
+    @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
+    def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
+        """
+        Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
+        Start an Producer and Consumer via the SECURED port
+        Incrementally upgrade to add inter-broker be the secure protocol
+        Incrementally upgrade again to add ACLs as well as disabling the PLAINTEXT port
+        Ensure the producer and consumer ran throughout
+        """
+        #Given we have a broker that has both secure and PLAINTEXT ports open
+        self.kafka.security_protocol = client_protocol
+        self.kafka.interbroker_security_protocol = "PLAINTEXT"
+        self.kafka.start()
+
+        #Create Secured Producer and Consumer
+        self.create_producer_and_consumer()
+
+        #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
+        self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
+
+    @parametrize(new_client_sasl_mechanism='PLAIN')
+    def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
+        """
+        Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce
+        and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism.
+        """
+        self.kafka.interbroker_security_protocol = "SASL_SSL"
+        self.kafka.security_protocol = "SASL_SSL"
+        self.kafka.client_sasl_mechanism = "GSSAPI"
+        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.start()
+
+        # Create SASL/GSSAPI producer and consumer
+        self.create_producer_and_consumer()
+
+        # Rolling upgrade, adding new SASL mechanism, ensuring the GSSAPI producer/consumer continues to run
+        self.run_produce_consume_validate(self.add_sasl_mechanism, new_client_sasl_mechanism)
+
+        # Now we can produce and consume using the new SASL mechanism
+        self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
+        self.create_producer_and_consumer()
+        self.run_produce_consume_validate(lambda: time.sleep(1))
+
+    @parametrize(new_sasl_mechanism='PLAIN')
+    def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
+        """
+        Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one).
+        Start Producer and Consumer using the second mechanism
+        Incrementally upgrade to set inter-broker to the second mechanism and disable GSSAPI
+        Incrementally upgrade again to add ACLs
+        Ensure the producer and consumer run throughout
+        """
+        #Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients
+        self.kafka.security_protocol = "SASL_SSL"
+        self.kafka.interbroker_security_protocol = "SASL_SSL"
+        self.kafka.client_sasl_mechanism = new_sasl_mechanism
+        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.start()
+
+        #Create Producer and Consumer using second mechanism
+        self.create_producer_and_consumer()
+
+        #Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout
+        self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism)
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/upgrade/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade/__init__.py b/tests/kafkatest/tests/upgrade/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/upgrade/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade/upgrade_test.py b/tests/kafkatest/tests/upgrade/upgrade_test.py
new file mode 100644
index 0000000..26c7099
--- /dev/null
+++ b/tests/kafkatest/tests/upgrade/upgrade_test.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+
+import json
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, TRUNK, KafkaVersion
+
+class TestUpgrade(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(TestUpgrade, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
+        self.logger.info("First pass bounce - rolling upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = TRUNK
+            node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
+            node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
+            self.kafka.start_node(node)
+
+        self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
+            if to_message_format_version is None:
+                del node.config[config_property.MESSAGE_FORMAT_VERSION]
+            else:
+                node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
+            self.kafka.start_node(node)
+
+    @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL")
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
+    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
+    def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
+                     new_consumer=True, security_protocol="PLAINTEXT"):
+        """Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0 or 0.10.0 to the current version
+
+        from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 0.9.0.x or 0.10.0.x
+
+        If to_message_format_version is None, it means that we will upgrade to default (latest)
+        message format version. It is possible to upgrade to 0.10 brokers but still use message
+        format version 0.9
+
+        - Start 3 node broker cluster on version 'from_kafka_version'
+        - Start producer and consumer in the background
+        - Perform two-phase rolling upgrade
+            - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to
+            from_kafka_version and log.message.format.version set to from_kafka_version
+            - Second phase: remove inter.broker.protocol.version config with rolling bounce; if
+            to_message_format_version is set to 0.9, set log.message.format.version to
+            to_message_format_version, otherwise remove log.message.format.version config
+        - Finally, validate that every message acked by the producer was consumed by the consumer
+        """
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
+                                  version=KafkaVersion(from_kafka_version),
+                                  topics={self.topic: {"partitions": 3, "replication-factor": 3,
+                                                       'configs': {"min.insync.replicas": 2}}})
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.start()
+
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           compression_types=compression_types,
+                                           version=KafkaVersion(from_kafka_version))
+
+        assert self.zk.query("/cluster/id") is None
+
+        # TODO - reduce the timeout
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, consumer_timeout_ms=200000, new_consumer=new_consumer,
+                                        message_validator=is_int, version=KafkaVersion(from_kafka_version))
+
+        self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version,
+                                                                                        to_message_format_version))
+
+        cluster_id_json = self.zk.query("/cluster/id")
+        assert cluster_id_json is not None
+        try:
+            cluster_id = json.loads(cluster_id_json)
+        except :
+            self.logger.debug("Data in /cluster/id znode could not be parsed. Data = %s" % cluster_id_json)
+
+        self.logger.debug("Cluster id [%s]", cluster_id)
+        assert len(cluster_id["id"]) == 22

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/travis/Dockerfile
----------------------------------------------------------------------
diff --git a/tests/travis/Dockerfile b/tests/travis/Dockerfile
new file mode 100644
index 0000000..48f8bd6
--- /dev/null
+++ b/tests/travis/Dockerfile
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM openjdk:8
+
+MAINTAINER Raghav Kumar Gautam
+# commands to update docker image:
+#   - docker build . -t raghavgautam/kfk-image
+#   - docker push raghavgautam/kfk-image
+RUN apt update
+RUN apt install -y unzip wget curl jq coreutils openssh-server net-tools vim openjdk-8-jdk python-pip
+RUN pip install ducktape
+
+VOLUME ["/kafka"]
+VOLUME ["/kfk_src"]
+
+ENV MIRROR="http://apache.cs.utah.edu/"
+RUN wget -q "${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" -O "/tmp/kafka_2.10-0.8.2.2.tgz" && tar xfz /tmp/kafka_2.10-0.8.2.2.tgz -C /opt && mv "/opt/kafka_2.10-0.8.2.2" "/opt/kafka-0.8.2.2"
+RUN wget -q "${MIRROR}kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz" -O "/tmp/kafka_2.10-0.9.0.1.tgz" && tar xfz /tmp/kafka_2.10-0.9.0.1.tgz -C /opt && mv "/opt/kafka_2.10-0.9.0.1" "/opt/kafka-0.9.0.1"
+RUN wget -q "${MIRROR}kafka/0.10.0.1/kafka_2.10-0.10.0.1.tgz" -O "/tmp/kafka_2.10-0.10.0.1.tgz" && tar xfz /tmp/kafka_2.10-0.10.0.1.tgz -C /opt && mv "/opt/kafka_2.10-0.10.0.1" "/opt/kafka-0.10.0.1"
+
+RUN rm /tmp/kafka_*.tgz
+ADD ssh /root/.ssh
+RUN chmod 600 /root/.ssh/id_rsa
+
+CMD service ssh start && tail -f /dev/null

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/travis/run_tests.sh
----------------------------------------------------------------------
diff --git a/tests/travis/run_tests.sh b/tests/travis/run_tests.sh
new file mode 100755
index 0000000..6660625
--- /dev/null
+++ b/tests/travis/run_tests.sh
@@ -0,0 +1,58 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# To run tests use a command like:
+#   TC_PATHS="tests/kafkatest/tests/streams tests/kafkatest/tests/tools" bash tests/travis/run_tests.sh
+set -x
+
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+TESTS_DIR=`dirname ${SCRIPT_DIR}`
+KFK_SRC=`dirname ${TESTS_DIR}`
+
+
+cd ${SCRIPT_DIR}
+chmod 600 ssh/id_rsa
+
+docker network rm knw
+docker network create knw
+
+docker kill $(docker ps -f=network=knw -q)
+docker rm $(docker ps -a -f=network=knw -q)
+
+for i in $(seq -w 1 12); do
+  docker run -d -t --name knode${i} --network knw -v ${KFK_SRC}:/kfk_src raghavgautam/kfk-image
+done
+
+docker info
+docker ps
+docker network inspect knw
+
+for i in $(seq -w 1 12); do
+  echo knode${i}
+  docker exec knode${i} bash -c "(tar xfz /kfk_src/core/build/distributions/kafka_*SNAPSHOT.tgz -C /opt || echo missing kafka tgz did you build kafka tarball) && mv /opt/kafka*SNAPSHOT /opt/kafka-trunk && ls -l /opt"
+  docker exec knode01 bash -c "ssh knode$i hostname"
+done
+
+# hack to copy test dependencies
+# this is required for running MiniKDC
+(cd ${KFK_SRC} && ./gradlew copyDependantTestLibs)
+for i in $(seq -w 1 12); do
+  echo knode${i}
+  docker exec knode${i} bash -c "cp /kfk_src/core/build/dependant-testlibs/* /opt/kafka-trunk/libs/"
+  docker exec knode01 bash -c "ssh knode$i hostname"
+done
+
+docker exec knode01 bash -c "cd /kfk_src; ducktape ${_DUCKTAPE_OPTIONS} --cluster-file tests/cluster_file.json ${TC_PATHS:-tests/kafkatest/tests}"

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/travis/ssh/authorized_keys
----------------------------------------------------------------------
diff --git a/tests/travis/ssh/authorized_keys b/tests/travis/ssh/authorized_keys
new file mode 100644
index 0000000..9f9da1f
--- /dev/null
+++ b/tests/travis/ssh/authorized_keys
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC0qDT9kEPWc8JQ53b4KnT/ZJOLwb+3c//jpLW/2ofjDyIsPW4FohLpicfouch/zsRpN4G38lua+2BsGls9sMIZc6PXY2L+NIGCkqEMdCoU1Ym8SMtyJklfzp3m/0PeK9s2dLlR3PFRYvyFA4btQK5hkbYDNZPzf4airvzdRzLkrFf81+RemaMI2EtONwJRcbLViPaTXVKJdbFwJTJ1u7yu9wDYWHKBMA92mHTQeP6bhVYCqxJn3to/RfZYd+sHw6mfxVg5OrAlUOYpSV4pDNCAsIHdtZ56V8NQlJL6NJ2vzzSSYUwLMqe88fhrC8yYHoxC07QPy1EdkSTHdohAicyT root@knode01.knw

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/travis/ssh/config
----------------------------------------------------------------------
diff --git a/tests/travis/ssh/config b/tests/travis/ssh/config
new file mode 100644
index 0000000..1f87417
--- /dev/null
+++ b/tests/travis/ssh/config
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Host *
+  ControlMaster auto
+  ControlPath ~/.ssh/master-%r@%h:%p
+  StrictHostKeyChecking no
+  ConnectTimeout=10
+  IdentityFile ~/.ssh/id_rsa

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/travis/ssh/id_rsa
----------------------------------------------------------------------
diff --git a/tests/travis/ssh/id_rsa b/tests/travis/ssh/id_rsa
new file mode 100644
index 0000000..276e07b
--- /dev/null
+++ b/tests/travis/ssh/id_rsa
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpQIBAAKCAQEAtKg0/ZBD1nPCUOd2+Cp0/2STi8G/t3P/46S1v9qH4w8iLD1u
+BaIS6YnH6LnIf87EaTeBt/JbmvtgbBpbPbDCGXOj12Ni/jSBgpKhDHQqFNWJvEjL
+ciZJX86d5v9D3ivbNnS5UdzxUWL8hQOG7UCuYZG2AzWT83+Goq783Ucy5KxX/Nfk
+XpmjCNhLTjcCUXGy1Yj2k11SiXWxcCUydbu8rvcA2FhygTAPdph00Hj+m4VWAqsS
+Z97aP0X2WHfrB8Opn8VYOTqwJVDmKUleKQzQgLCB3bWeelfDUJSS+jSdr880kmFM
+CzKnvPH4awvMmB6MQtO0D8tRHZEkx3aIQInMkwIDAQABAoIBAQCz6EMFNNLp0NP1
+X9yRXS6wW4e4CRWUazesiw3YZpcmnp6IchCMGZA99FEZyVILPW1J3tYWyotBdw7Z
++RFeCRXy5L+IMtiVkNJcpwss7M4ve0w0LkY0gj5V49xJ+3Gp4gDnZSxcguvrAem5
+yP5obR572fDpl0SknB4HCr6U2l+rauzrLyevy5eeDT/vmXbuM1cdHpNIXmmElz4L
+t31n+exQRn6tP1h516iXbcYbopxDgdv2qKGAqzWKE6TyWpzF5x7kjOEYt0bZ5QO3
+Lwh7AAqE/3mwxlYwng1L4WAT7RtcP19W+9JDIc7ENInMGxq6q46p1S3IPZsf1cj/
+aAJ9q3LBAoGBAOVJr0+WkR786n3BuswpGQWBgVxfai4y9Lf90vuGKawdQUzXv0/c
+EB/CFqP/dIsquukA8PfzjNMyTNmEHXi4Sf16H8Rg4EGhIYMEqIQojx1t/yLLm0aU
+YPEvW/02Umtlg3pJw9fQAAzFVqCasw2E2lUdAUkydGRwDUJZmv2/b3NzAoGBAMm0
+Jo7Et7ochH8Vku6uA+hG+RdwlKFm5JA7/Ci3DOdQ1zmJNrvBBFQLo7AjA4iSCoBd
+s9+y0nrSPcF4pM3l6ghLheaqbnIi2HqIMH9mjDbrOZiWvbnjvjpOketgNX8vV3Ye
+GUkSjoNcmvRmdsICmUjeML8bGOmq4zF9W/GIfTphAoGBAKGRo8R8f/SLGh3VtvCI
+gUY89NAHuEWnyIQii1qMNq8+yjYAzaHTm1UVqmiT6SbrzFvGOwcuCu0Dw91+2Fmp
+2xGPzfTOoxf8GCY/0ROXlQmS6jc1rEw24Hzz92ldrwRYuyYf9q4Ltw1IvXtcp5F+
+LW/OiYpv0E66Gs3HYI0wKbP7AoGBAJMZWeFW37LQJ2TTJAQDToAwemq4xPxsoJX7
+2SsMTFHKKBwi0JLe8jwk/OxwrJwF/bieHZcvv8ao2zbkuDQcz6/a/D074C5G8V9z
+QQM4k1td8vQwQw91Yv782/gvgvRNX1iaHNCowtxURgGlVEirQoTc3eoRZfrLkMM/
+7DTa2JEhAoGACEu3zHJ1sgyeOEgLArUJXlQM30A/ulMrnCd4MEyIE+ReyWAUevUQ
+0lYdVNva0/W4C5e2lUOJL41jjIPLqI7tcFR2PZE6n0xTTkxNH5W2u1WpFeKjx+O3
+czv7Bt6wYyLHIMy1JEqAQ7pw1mtJ5s76UDvXUhciF+DU2pWYc6APKR0=
+-----END RSA PRIVATE KEY-----

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/travis/ssh/id_rsa.pub
----------------------------------------------------------------------
diff --git a/tests/travis/ssh/id_rsa.pub b/tests/travis/ssh/id_rsa.pub
new file mode 100644
index 0000000..76e8f5f
--- /dev/null
+++ b/tests/travis/ssh/id_rsa.pub
@@ -0,0 +1 @@
+ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC0qDT9kEPWc8JQ53b4KnT/ZJOLwb+3c//jpLW/2ofjDyIsPW4FohLpicfouch/zsRpN4G38lua+2BsGls9sMIZc6PXY2L+NIGCkqEMdCoU1Ym8SMtyJklfzp3m/0PeK9s2dLlR3PFRYvyFA4btQK5hkbYDNZPzf4airvzdRzLkrFf81+RemaMI2EtONwJRcbLViPaTXVKJdbFwJTJ1u7yu9wDYWHKBMA92mHTQeP6bhVYCqxJn3to/RfZYd+sHw6mfxVg5OrAlUOYpSV4pDNCAsIHdtZ56V8NQlJL6NJ2vzzSSYUwLMqe88fhrC8yYHoxC07QPy1EdkSTHdohAicyT root@knode01.knw


[2/4] kafka git commit: KAFKA-4345; Run decktape test for each pull request

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
deleted file mode 100644
index 15a9696..0000000
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ /dev/null
@@ -1,128 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-
-import json
-
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka import config_property
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, TRUNK, KafkaVersion
-
-class TestUpgrade(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(TestUpgrade, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
-        self.logger.info("First pass bounce - rolling upgrade")
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            node.version = TRUNK
-            node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
-            node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
-            self.kafka.start_node(node)
-
-        self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
-            if to_message_format_version is None:
-                del node.config[config_property.MESSAGE_FORMAT_VERSION]
-            else:
-                node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
-            self.kafka.start_node(node)
-
-    @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
-    @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False)
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL")
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=False)
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False)
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False)
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
-    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False)
-    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
-    def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
-                     new_consumer=True, security_protocol="PLAINTEXT"):
-        """Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0 or 0.10.0 to the current version
-
-        from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 0.9.0.x or 0.10.0.x
-
-        If to_message_format_version is None, it means that we will upgrade to default (latest)
-        message format version. It is possible to upgrade to 0.10 brokers but still use message
-        format version 0.9
-
-        - Start 3 node broker cluster on version 'from_kafka_version'
-        - Start producer and consumer in the background
-        - Perform two-phase rolling upgrade
-            - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to
-            from_kafka_version and log.message.format.version set to from_kafka_version
-            - Second phase: remove inter.broker.protocol.version config with rolling bounce; if
-            to_message_format_version is set to 0.9, set log.message.format.version to
-            to_message_format_version, otherwise remove log.message.format.version config
-        - Finally, validate that every message acked by the producer was consumed by the consumer
-        """
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
-                                  version=KafkaVersion(from_kafka_version),
-                                  topics={self.topic: {"partitions": 3, "replication-factor": 3,
-                                                       'configs': {"min.insync.replicas": 2}}})
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        self.kafka.start()
-
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           compression_types=compression_types,
-                                           version=KafkaVersion(from_kafka_version))
-
-        assert self.zk.query("/cluster/id") is None
-
-        # TODO - reduce the timeout
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
-                                        message_validator=is_int, version=KafkaVersion(from_kafka_version))
-
-        self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version,
-                                                                                        to_message_format_version))
-
-        cluster_id_json = self.zk.query("/cluster/id")
-        assert cluster_id_json is not None
-        try:
-            cluster_id = json.loads(cluster_id_json)
-        except :
-            self.logger.debug("Data in /cluster/id znode could not be parsed. Data = %s" % cluster_id_json)
-
-        self.logger.debug("Cluster id [%s]", cluster_id)
-        assert len(cluster_id["id"]) == 22

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
deleted file mode 100644
index 0cfdf16..0000000
--- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.security.kafka_acls import ACLs
-from kafkatest.utils import is_int
-
-class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
-    """Tests a rolling upgrade for zookeeper.
-    """
-
-    def __init__(self, test_context):
-        super(ZooKeeperSecurityUpgradeTest, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.group = "group"
-        self.producer_throughput = 100
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.acls = ACLs(self.test_context)
-
-        self.zk = ZookeeperService(self.test_context, num_nodes=3)
-
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-            "partitions": 3,
-            "replication-factor": 3,
-            'configs': {"min.insync.replicas": 2}}})
-
-    def create_producer_and_consumer(self):
-        self.producer = VerifiableProducer(
-            self.test_context, self.num_producers, self.kafka, self.topic,
-            throughput=self.producer_throughput)
-
-        self.consumer = ConsoleConsumer(
-            self.test_context, self.num_consumers, self.kafka, self.topic,
-            consumer_timeout_ms=60000, message_validator=is_int)
-
-        self.consumer.group_id = self.group
-
-    @property
-    def no_sasl(self):
-        return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol == "SSL"
-
-    @property
-    def is_secure(self):
-        return self.kafka.security_protocol == "SASL_PLAINTEXT" \
-               or self.kafka.security_protocol == "SSL" \
-               or self.kafka.security_protocol == "SASL_SSL"
-
-    def run_zk_migration(self):
-        # change zk config (auth provider + jaas login)
-        self.zk.kafka_opts = self.zk.security_system_properties
-        self.zk.zk_sasl = True
-        if self.no_sasl:
-            self.kafka.start_minikdc(self.zk.zk_principals)
-        # restart zk
-        for node in self.zk.nodes:
-            self.zk.stop_node(node)
-            self.zk.start_node(node)
-
-        # restart broker with jaas login
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            self.kafka.start_node(node)
-
-        # run migration tool
-        for node in self.zk.nodes:
-            self.zk.zookeeper_migration(node, "secure")
-
-        # restart broker with zookeeper.set.acl=true and acls
-        self.kafka.zk_set_acl = True
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            self.kafka.start_node(node)
-
-    @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"])
-    def test_zk_security_upgrade(self, security_protocol):
-        self.zk.start()
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-
-        # set acls
-        if self.is_secure:
-            self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
-            self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
-
-        if(self.no_sasl):
-            self.kafka.start()
-        else:
-            self.kafka.start(self.zk.zk_principals)
-
-        #Create Producer and Consumer
-        self.create_producer_and_consumer()
-
-        #Run upgrade
-        self.run_produce_consume_validate(self.run_zk_migration)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core1/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/__init__.py b/tests/kafkatest/tests/core1/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core1/consumer_group_command_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/consumer_group_command_test.py b/tests/kafkatest/tests/core1/consumer_group_command_test.py
new file mode 100644
index 0000000..c3f59d9
--- /dev/null
+++ b/tests/kafkatest/tests/core1/consumer_group_command_test.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+
+import os
+import re
+
+TOPIC = "topic-consumer-group-command"
+
+class ConsumerGroupCommandTest(Test):
+    """
+    Tests ConsumerGroupCommand
+    """
+    # Root directory for persistent output
+    PERSISTENT_ROOT = "/mnt/consumer_group_command"
+    COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
+
+    def __init__(self, test_context):
+        super(ConsumerGroupCommandTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.topics = {
+            TOPIC: {'partitions': 1, 'replication-factor': 1}
+        }
+        self.zk = ZookeeperService(test_context, self.num_zk)
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+        self.kafka.start()
+
+    def start_consumer(self, security_protocol):
+        enable_new_consumer = security_protocol == SecurityConfig.SSL
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
+                                        consumer_timeout_ms=None, new_consumer=enable_new_consumer)
+        self.consumer.start()
+
+    def setup_and_verify(self, security_protocol, group=None):
+        self.start_kafka(security_protocol, security_protocol)
+        self.start_consumer(security_protocol)
+        consumer_node = self.consumer.nodes[0]
+        wait_until(lambda: self.consumer.alive(consumer_node),
+                   timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
+        kafka_node = self.kafka.nodes[0]
+        if security_protocol is not SecurityConfig.PLAINTEXT:
+            prop_file = str(self.kafka.security_config.client_config())
+            self.logger.debug(prop_file)
+            kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+            kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)
+
+        # Verify ConsumerGroupCommand lists expected consumer groups
+        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
+        command_config_file = None
+        if enable_new_consumer:
+            command_config_file = self.COMMAND_CONFIG_FILE
+
+        if group:
+            wait_until(lambda: re.search("topic-consumer-group-command",self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file)), timeout_sec=10,
+                       err_msg="Timed out waiting to list expected consumer groups.")
+        else:
+            wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
+                       err_msg="Timed out waiting to list expected consumer groups.")
+
+        self.consumer.stop()
+
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
+        """
+        Tests if ConsumerGroupCommand is listing correct consumer groups
+        :return: None
+        """
+        self.setup_and_verify(security_protocol)
+
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
+        """
+        Tests if ConsumerGroupCommand is describing a consumer group correctly
+        :return: None
+        """
+        self.setup_and_verify(security_protocol, group="test-consumer-group")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core1/get_offset_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/get_offset_shell_test.py b/tests/kafkatest/tests/core1/get_offset_shell_test.py
new file mode 100644
index 0000000..38bd9dc
--- /dev/null
+++ b/tests/kafkatest/tests/core1/get_offset_shell_test.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+
+TOPIC = "topic-get-offset-shell"
+MAX_MESSAGES = 100
+NUM_PARTITIONS = 1
+REPLICATION_FACTOR = 1
+
+class GetOffsetShellTest(Test):
+    """
+    Tests GetOffsetShell tool
+    """
+    def __init__(self, test_context):
+        super(GetOffsetShellTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.messages_received_count = 0
+        self.topics = {
+            TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+
+
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+        self.kafka.start()
+
+    def start_producer(self):
+        # This will produce to kafka cluster
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
+        self.producer.start()
+        current_acked = self.producer.num_acked
+        wait_until(lambda: self.producer.num_acked >= current_acked + MAX_MESSAGES, timeout_sec=10,
+                   err_msg="Timeout awaiting messages to be produced and acked")
+
+    def start_consumer(self, security_protocol):
+        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
+                                        consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
+        self.consumer.start()
+
+    def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
+        """
+        Tests if GetOffsetShell is getting offsets correctly
+        :return: None
+        """
+        self.start_kafka(security_protocol, security_protocol)
+        self.start_producer()
+
+        # Assert that offset fetched without any consumers consuming is 0
+        assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0)
+
+        self.start_consumer(security_protocol)
+
+        node = self.consumer.nodes[0]
+
+        wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
+
+        # Assert that offset is correctly indicated by GetOffsetShell tool
+        wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), timeout_sec=10,
+                   err_msg="Timed out waiting to reach expected offset.")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core1/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/reassign_partitions_test.py b/tests/kafkatest/tests/core1/reassign_partitions_test.py
new file mode 100644
index 0000000..850e2aa
--- /dev/null
+++ b/tests/kafkatest/tests/core1/reassign_partitions_test.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class ReassignPartitionsTest(ProduceConsumeValidateTest):
+    """
+    These tests validate partition reassignment.
+    Create a topic with few partitions, load some data, trigger partition re-assignment with and without broker failure,
+    check that partition re-assignment can complete and there is no data loss.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ReassignPartitionsTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 20,
+                                                                    "replication-factor": 3,
+                                                                    'configs': {"min.insync.replicas": 2}}
+                                                                })
+        self.num_partitions = 20
+        self.timeout_sec = 60
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(ReassignPartitionsTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    def clean_bounce_some_brokers(self):
+        """Bounce every other broker"""
+        for node in self.kafka.nodes[::2]:
+            self.kafka.restart_node(node, clean_shutdown=True)
+
+    def reassign_partitions(self, bounce_brokers):
+        partition_info = self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
+        self.logger.debug("Partitions before reassignment:" + str(partition_info))
+
+        # jumble partition assignment in dictionary
+        seed = random.randint(0, 2 ** 31 - 1)
+        self.logger.debug("Jumble partition assignment with seed " + str(seed))
+        random.seed(seed)
+        # The list may still be in order, but that's ok
+        shuffled_list = range(0, self.num_partitions)
+        random.shuffle(shuffled_list)
+
+        for i in range(0, self.num_partitions):
+            partition_info["partitions"][i]["partition"] = shuffled_list[i]
+        self.logger.debug("Jumbled partitions: " + str(partition_info))
+
+        # send reassign partitions command
+        self.kafka.execute_reassign_partitions(partition_info)
+
+        if bounce_brokers:
+            # bounce a few brokers at the same time
+            self.clean_bounce_some_brokers()
+
+        # Wait until finished or timeout
+        wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5)
+
+    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
+    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
+    def test_reassign_partitions(self, bounce_brokers, security_protocol):
+        """Reassign partitions tests.
+        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
+
+            - Produce messages in the background
+            - Consume messages in the background
+            - Reassign partitions
+            - If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress
+            - When done reassigning partitions and bouncing brokers, stop producing, and finish consuming
+            - Validate that every acked message was consumed
+        """
+
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
+        self.kafka.start()
+
+        self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core1/simple_consumer_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/simple_consumer_shell_test.py b/tests/kafkatest/tests/core1/simple_consumer_shell_test.py
new file mode 100644
index 0000000..74a7eeb
--- /dev/null
+++ b/tests/kafkatest/tests/core1/simple_consumer_shell_test.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from kafkatest.services.simple_consumer_shell import SimpleConsumerShell
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+TOPIC = "topic-simple-consumer-shell"
+MAX_MESSAGES = 100
+NUM_PARTITIONS = 1
+REPLICATION_FACTOR = 1
+
+class SimpleConsumerShellTest(Test):
+    """
+    Tests SimpleConsumerShell tool
+    """
+    def __init__(self, test_context):
+        super(SimpleConsumerShellTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.messages_received_count = 0
+        self.topics = {
+            TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, topics=self.topics)
+        self.kafka.start()
+
+    def run_producer(self):
+        # This will produce to kafka cluster
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
+        self.producer.start()
+        wait_until(lambda: self.producer.num_acked == MAX_MESSAGES, timeout_sec=10,
+                   err_msg="Timeout awaiting messages to be produced and acked")
+
+    def start_simple_consumer_shell(self):
+        self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC)
+        self.simple_consumer_shell.start()
+
+    def test_simple_consumer_shell(self):
+        """
+        Tests if SimpleConsumerShell is fetching expected records
+        :return: None
+        """
+        self.start_kafka()
+        self.run_producer()
+        self.start_simple_consumer_shell()
+
+        # Assert that SimpleConsumerShell is fetching expected number of messages
+        wait_until(lambda: self.simple_consumer_shell.get_output().count("\n") == (MAX_MESSAGES + 1), timeout_sec=10,
+                   err_msg="Timed out waiting to receive expected number of messages.")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core1/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/throttling_test.py b/tests/kafkatest/tests/core1/throttling_test.py
new file mode 100644
index 0000000..2e21322
--- /dev/null
+++ b/tests/kafkatest/tests/core1/throttling_test.py
@@ -0,0 +1,173 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import math
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.performance import ProducerPerformanceService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.utils import is_int
+
+
+class ThrottlingTest(ProduceConsumeValidateTest):
+    """Tests throttled partition reassignment. This is essentially similar
+    to the reassign_partitions_test, except that we throttle the reassignment
+    and verify that it takes a sensible amount of time given the throttle
+    and the amount of data being moved.
+
+    Since the correctness is time dependent, this test also simplifies the
+    cluster topology. In particular, we fix the number of brokers, the
+    replication-factor, the number of partitions, the partition size, and
+    the number of partitions being moved so that we can accurately predict
+    the time throttled reassignment should take.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ThrottlingTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        # Because we are starting the producer/consumer/validate cycle _after_
+        # seeding the cluster with big data (to test throttling), we need to
+        # Start the consumer from the end of the stream. further, we need to
+        # ensure that the consumer is fully started before the producer starts
+        # so that we don't miss any messages. This timeout ensures the sufficient
+        # condition.
+        self.consumer_init_timeout_sec =  10
+        self.num_brokers = 6
+        self.num_partitions = 3
+        self.kafka = KafkaService(test_context,
+                                  num_nodes=self.num_brokers,
+                                  zk=self.zk,
+                                  topics={
+                                      self.topic: {
+                                          "partitions": self.num_partitions,
+                                          "replication-factor": 2,
+                                          "configs": {
+                                              "segment.bytes": 64 * 1024 * 1024
+                                          }
+                                      }
+                                  })
+        self.producer_throughput = 1000
+        self.timeout_sec = 400
+        self.num_records = 2000
+        self.record_size = 4096 * 100  # 400 KB
+        # 1 MB per partition on average.
+        self.partition_size = (self.num_records * self.record_size) / self.num_partitions
+        self.num_producers = 2
+        self.num_consumers = 1
+        self.throttle = 4 * 1024 * 1024  # 4 MB/s
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(ThrottlingTest, self).min_cluster_size() +\
+            self.num_producers + self.num_consumers
+
+    def clean_bounce_some_brokers(self):
+        """Bounce every other broker"""
+        for node in self.kafka.nodes[::2]:
+            self.kafka.restart_node(node, clean_shutdown=True)
+
+    def reassign_partitions(self, bounce_brokers, throttle):
+        """This method reassigns partitions using a throttle. It makes an
+        assertion about the minimum amount of time the reassignment should take
+        given the value of the throttle, the number of partitions being moved,
+        and the size of each partition.
+        """
+        partition_info = self.kafka.parse_describe_topic(
+            self.kafka.describe_topic(self.topic))
+        self.logger.debug("Partitions before reassignment:" +
+                          str(partition_info))
+        max_num_moves = 0
+        for i in range(0, self.num_partitions):
+            old_replicas = set(partition_info["partitions"][i]["replicas"])
+            new_part = (i+1) % self.num_partitions
+            new_replicas = set(partition_info["partitions"][new_part]["replicas"])
+            max_num_moves = max(len(new_replicas - old_replicas), max_num_moves)
+            partition_info["partitions"][i]["partition"] = new_part
+        self.logger.debug("Jumbled partitions: " + str(partition_info))
+
+        self.kafka.execute_reassign_partitions(partition_info,
+                                               throttle=throttle)
+        start = time.time()
+        if bounce_brokers:
+            # bounce a few brokers at the same time
+            self.clean_bounce_some_brokers()
+
+        # Wait until finished or timeout
+        size_per_broker = max_num_moves * self.partition_size
+        self.logger.debug("Max amount of data transfer per broker: %fb",
+                          size_per_broker)
+        estimated_throttled_time = math.ceil(float(size_per_broker) /
+                                             self.throttle)
+        estimated_time_with_buffer = estimated_throttled_time * 2
+        self.logger.debug("Waiting %ds for the reassignment to complete",
+                          estimated_time_with_buffer)
+        wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info),
+                   timeout_sec=estimated_time_with_buffer, backoff_sec=.5)
+        stop = time.time()
+        time_taken = stop - start
+        self.logger.debug("Transfer took %d second. Estimated time : %ds",
+                          time_taken,
+                          estimated_throttled_time)
+        assert time_taken >= estimated_throttled_time, \
+            ("Expected rebalance to take at least %ds, but it took %ds" % (
+                estimated_throttled_time,
+                time_taken))
+
+    @parametrize(bounce_brokers=False)
+    @parametrize(bounce_brokers=True)
+    def test_throttled_reassignment(self, bounce_brokers):
+        security_protocol = 'PLAINTEXT'
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+
+        producer_id = 'bulk_producer'
+        bulk_producer = ProducerPerformanceService(
+            context=self.test_context, num_nodes=1, kafka=self.kafka,
+            topic=self.topic, num_records=self.num_records,
+            record_size=self.record_size, throughput=-1, client_id=producer_id,
+            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
+            jmx_attributes=['outgoing-byte-rate'])
+
+
+        self.producer = VerifiableProducer(context=self.test_context,
+                                           num_nodes=1,
+                                           kafka=self.kafka, topic=self.topic,
+                                           message_validator=is_int,
+                                           throughput=self.producer_throughput)
+
+        self.consumer = ConsoleConsumer(self.test_context,
+                                        self.num_consumers,
+                                        self.kafka,
+                                        self.topic,
+                                        consumer_timeout_ms=60000,
+                                        message_validator=is_int,
+                                        from_beginning=False)
+
+        self.kafka.start()
+        bulk_producer.run()
+        self.run_produce_consume_validate(core_test_action=
+                                          lambda: self.reassign_partitions(bounce_brokers, self.throttle))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core2/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core2/__init__.py b/tests/kafkatest/tests/core2/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/tests/core2/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py
new file mode 100644
index 0000000..d6a0a12
--- /dev/null
+++ b/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py
@@ -0,0 +1,80 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
+
+
+# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
+class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.messages_per_producer = 1000
+
+    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=None)
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
+    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
+       
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    'configs': {"min.insync.replicas": 2}}})
+        for node in self.kafka.nodes:
+            if timestamp_type is not None:
+                node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
+        self.kafka.start()
+         
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           compression_types=compression_types,
+                                           version=KafkaVersion(producer_version))
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
+                                        message_validator=is_int, version=KafkaVersion(consumer_version))
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/mirror_maker/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/mirror_maker/__init__.py b/tests/kafkatest/tests/mirror_maker/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py b/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py
new file mode 100644
index 0000000..afb1972
--- /dev/null
+++ b/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize, matrix, ignore
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.mirror_maker import MirrorMaker
+from kafkatest.services.security.minikdc import MiniKdc
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+
+import time
+
+
+class TestMirrorMakerService(ProduceConsumeValidateTest):
+    """Sanity checks on mirror maker service class."""
+    def __init__(self, test_context):
+        super(TestMirrorMakerService, self).__init__(test_context)
+
+        self.topic = "topic"
+        self.source_zk = ZookeeperService(test_context, num_nodes=1)
+        self.target_zk = ZookeeperService(test_context, num_nodes=1)
+
+        self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+        self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+        # This will produce to source kafka cluster
+        self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
+                                           throughput=1000)
+        self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
+                                        whitelist=self.topic, offset_commit_interval_ms=1000)
+        # This will consume from target kafka cluster
+        self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
+                                        message_validator=is_int, consumer_timeout_ms=60000)
+
+    def setUp(self):
+        # Source cluster
+        self.source_zk.start()
+
+        # Target cluster
+        self.target_zk.start()
+
+    def start_kafka(self, security_protocol):
+        self.source_kafka.security_protocol = security_protocol
+        self.source_kafka.interbroker_security_protocol = security_protocol
+        self.target_kafka.security_protocol = security_protocol
+        self.target_kafka.interbroker_security_protocol = security_protocol
+        if self.source_kafka.security_config.has_sasl_kerberos:
+            minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes)
+            self.source_kafka.minikdc = minikdc
+            self.target_kafka.minikdc = minikdc
+            minikdc.start()
+        self.source_kafka.start()
+        self.target_kafka.start()
+
+    def bounce(self, clean_shutdown=True):
+        """Bounce mirror maker with a clean (kill -15) or hard (kill -9) shutdown"""
+
+        # Wait until messages start appearing in the target cluster
+        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=15)
+
+        # Wait for at least one offset to be committed.
+        #
+        # This step is necessary to prevent data loss with default mirror maker settings:
+        # currently, if we don't have at least one committed offset,
+        # and we bounce mirror maker, the consumer internals will throw OffsetOutOfRangeException, and the default
+        # auto.offset.reset policy ("largest") will kick in, causing mirrormaker to start consuming from the largest
+        # offset. As a result, any messages produced to the source cluster while mirrormaker was dead won't get
+        # mirrored to the target cluster.
+        # (see https://issues.apache.org/jira/browse/KAFKA-2759)
+        #
+        # This isn't necessary with kill -15 because mirror maker commits its offsets during graceful
+        # shutdown.
+        if not clean_shutdown:
+            time.sleep(self.mirror_maker.offset_commit_interval_ms / 1000.0 + .5)
+
+        for i in range(3):
+            self.logger.info("Bringing mirror maker nodes down...")
+            for node in self.mirror_maker.nodes:
+                self.mirror_maker.stop_node(node, clean_shutdown=clean_shutdown)
+
+            num_consumed = len(self.consumer.messages_consumed[1])
+            self.logger.info("Bringing mirror maker nodes back up...")
+            for node in self.mirror_maker.nodes:
+                self.mirror_maker.start_node(node)
+
+            # Ensure new messages are once again showing up on the target cluster
+            # new consumer requires higher timeout here
+            wait_until(lambda: len(self.consumer.messages_consumed[1]) > num_consumed + 100, timeout_sec=60)
+
+    def wait_for_n_messages(self, n_messages=100):
+        """Wait for a minimum number of messages to be successfully produced."""
+        wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
+                     err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
+
+    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
+    def test_simple_end_to_end(self, security_protocol, new_consumer):
+        """
+        Test end-to-end behavior under non-failure conditions.
+
+        Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
+        One is source, and the other is target. Single-node mirror maker mirrors from source to target.
+
+        - Start mirror maker.
+        - Produce a small number of messages to the source cluster.
+        - Consume messages from target.
+        - Verify that number of consumed messages matches the number produced.
+        """
+        self.start_kafka(security_protocol)
+        self.consumer.new_consumer = new_consumer
+
+        self.mirror_maker.new_consumer = new_consumer
+        self.mirror_maker.start()
+
+        mm_node = self.mirror_maker.nodes[0]
+        with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
+            if new_consumer:
+                monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
+            else:
+                monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
+
+        self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages)
+        self.mirror_maker.stop()
+
+    @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
+    @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+    def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
+        """
+        Test end-to-end behavior under failure conditions.
+
+        Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
+        One is source, and the other is target. Single-node mirror maker mirrors from source to target.
+
+        - Start mirror maker.
+        - Produce to source cluster, and consume from target cluster in the background.
+        - Bounce MM process
+        - Verify every message acknowledged by the source producer is consumed by the target consumer
+        """
+        if new_consumer and not clean_shutdown:
+            # Increase timeout on downstream console consumer; mirror maker with new consumer takes extra time
+            # during hard bounce. This is because the restarted mirror maker consumer won't be able to rejoin
+            # the group until the previous session times out
+            self.consumer.consumer_timeout_ms = 60000
+
+        self.start_kafka(security_protocol)
+        self.consumer.new_consumer = new_consumer
+
+        self.mirror_maker.offsets_storage = offsets_storage
+        self.mirror_maker.new_consumer = new_consumer
+        self.mirror_maker.start()
+
+        # Wait until mirror maker has reset fetch offset at least once before continuing with the rest of the test
+        mm_node = self.mirror_maker.nodes[0]
+        with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
+            if new_consumer:
+                monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
+            else:
+                monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
+
+        self.run_produce_consume_validate(core_test_action=lambda: self.bounce(clean_shutdown=clean_shutdown))
+        self.mirror_maker.stop()

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 801ccde..3b54ad7 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import traceback
 
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
@@ -102,7 +103,7 @@ class ProduceConsumeValidateTest(Test):
         except BaseException as e:
             for s in self.test_context.services:
                 self.mark_for_collect(s)
-            raise
+            raise Exception(traceback.format_exc(e))
 
     @staticmethod
     def annotate_missing_msgs(missing, acked, consumed, msg):

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/replication/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication/__init__.py b/tests/kafkatest/tests/replication/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/replication/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication/replication_test.py b/tests/kafkatest/tests/replication/replication_test.py
new file mode 100644
index 0000000..f815034
--- /dev/null
+++ b/tests/kafkatest/tests/replication/replication_test.py
@@ -0,0 +1,154 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+
+import signal
+
+def broker_node(test, broker_type):
+    """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
+    """
+    if broker_type == "leader":
+        node = test.kafka.leader(test.topic, partition=0)
+    elif broker_type == "controller":
+        node = test.kafka.controller()
+    else:
+        raise Exception("Unexpected broker type %s." % (broker_type))
+
+    return node
+
+def clean_shutdown(test, broker_type):
+    """Discover broker node of requested type and shut it down cleanly.
+    """
+    node = broker_node(test, broker_type)
+    test.kafka.signal_node(node, sig=signal.SIGTERM)
+
+
+def hard_shutdown(test, broker_type):
+    """Discover broker node of requested type and shut it down with a hard kill."""
+    node = broker_node(test, broker_type)
+    test.kafka.signal_node(node, sig=signal.SIGKILL)
+
+
+def clean_bounce(test, broker_type):
+    """Chase the leader of one partition and restart it cleanly."""
+    for i in range(5):
+        prev_broker_node = broker_node(test, broker_type)
+        test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
+
+
+def hard_bounce(test, broker_type):
+    """Chase the leader and restart it with a hard kill."""
+    for i in range(5):
+        prev_broker_node = broker_node(test, broker_type)
+        test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
+
+        # Since this is a hard kill, we need to make sure the process is down and that
+        # zookeeper has registered the loss by expiring the broker's session timeout.
+
+        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
+                   timeout_sec=test.kafka.zk_session_timeout + 5,
+                   err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
+
+        test.kafka.start_node(prev_broker_node)
+
+failures = {
+    "clean_shutdown": clean_shutdown,
+    "hard_shutdown": hard_shutdown,
+    "clean_bounce": clean_bounce,
+    "hard_bounce": hard_bounce
+}
+
+
+class ReplicationTest(ProduceConsumeValidateTest):
+    """
+    Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
+    (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
+    too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
+    ordering guarantees.
+
+    Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
+    we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
+
+    Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
+    consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
+    indicator that nothing is left to consume.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ReplicationTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    'configs': {"min.insync.replicas": 2}}
+                                                                })
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the constructor"""
+        return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+            broker_type=["leader"],
+            security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+            broker_type=["controller"],
+            security_protocol=["PLAINTEXT", "SASL_SSL"])
+    @matrix(failure_mode=["hard_bounce"],
+            broker_type=["leader"],
+            security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"])
+    def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"):
+        """Replication tests.
+        These tests verify that replication provides simple durability guarantees by checking that data acked by
+        brokers is still available for consumption in the face of various failure scenarios.
+
+        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
+
+            - Produce messages in the background
+            - Consume messages in the background
+            - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
+            - When done driving failures, stop producing, and finish consuming
+            - Validate that every acked message was consumed
+        """
+
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.client_sasl_mechanism = client_sasl_mechanism
+        self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
+        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
+        self.kafka.start()
+        
+        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/security1/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security1/__init__.py b/tests/kafkatest/tests/security1/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/security1/security_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security1/security_test.py b/tests/kafkatest/tests/security1/security_test.py
new file mode 100644
index 0000000..b6bc656
--- /dev/null
+++ b/tests/kafkatest/tests/security1/security_test.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SslStores
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import time
+
+class TestSslStores(SslStores):
+    def __init__(self):
+        super(TestSslStores, self).__init__()
+        self.invalid_hostname = False
+        self.generate_ca()
+        self.generate_truststore()
+
+    def hostname(self, node):
+        if (self.invalid_hostname):
+            return "invalidhost"
+        else:
+            return super(TestSslStores, self).hostname(node)
+
+class SecurityTest(ProduceConsumeValidateTest):
+    """
+    These tests validate security features.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(SecurityTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 2,
+                                                                    "replication-factor": 1}
+                                                                })
+        self.num_partitions = 2
+        self.timeout_sec = 10000
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
+        """
+        Test that invalid hostname in certificate results in connection failures.
+        When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
+        When security_protocol=PLAINTEXT and interbroker_security_protocol=SSL, controller connections fail
+        with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
+        """
+
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = interbroker_security_protocol
+        SecurityConfig.ssl_stores = TestSslStores()
+
+        SecurityConfig.ssl_stores.invalid_hostname = True
+        self.kafka.start()
+        self.create_producer_and_consumer()
+        self.producer.log_level = "TRACE"
+        self.producer.start()
+        self.consumer.start()
+        time.sleep(10)
+        assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname"
+        error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE'
+        for node in self.producer.nodes:
+            node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
+        for node in self.consumer.nodes:
+            node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
+
+        self.producer.stop()
+        self.consumer.stop()
+        self.producer.log_level = "INFO"
+
+        SecurityConfig.ssl_stores.invalid_hostname = False
+        for node in self.kafka.nodes:
+            self.kafka.restart_node(node, clean_shutdown=True)
+        self.create_producer_and_consumer()
+        self.run_produce_consume_validate()
+
+    def create_producer_and_consumer(self):
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=10000, message_validator=is_int)
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py
new file mode 100644
index 0000000..0cfdf16
--- /dev/null
+++ b/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.security.kafka_acls import ACLs
+from kafkatest.utils import is_int
+
+class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
+    """Tests a rolling upgrade for zookeeper.
+    """
+
+    def __init__(self, test_context):
+        super(ZooKeeperSecurityUpgradeTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.group = "group"
+        self.producer_throughput = 100
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.acls = ACLs(self.test_context)
+
+        self.zk = ZookeeperService(self.test_context, num_nodes=3)
+
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+            "partitions": 3,
+            "replication-factor": 3,
+            'configs': {"min.insync.replicas": 2}}})
+
+    def create_producer_and_consumer(self):
+        self.producer = VerifiableProducer(
+            self.test_context, self.num_producers, self.kafka, self.topic,
+            throughput=self.producer_throughput)
+
+        self.consumer = ConsoleConsumer(
+            self.test_context, self.num_consumers, self.kafka, self.topic,
+            consumer_timeout_ms=60000, message_validator=is_int)
+
+        self.consumer.group_id = self.group
+
+    @property
+    def no_sasl(self):
+        return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol == "SSL"
+
+    @property
+    def is_secure(self):
+        return self.kafka.security_protocol == "SASL_PLAINTEXT" \
+               or self.kafka.security_protocol == "SSL" \
+               or self.kafka.security_protocol == "SASL_SSL"
+
+    def run_zk_migration(self):
+        # change zk config (auth provider + jaas login)
+        self.zk.kafka_opts = self.zk.security_system_properties
+        self.zk.zk_sasl = True
+        if self.no_sasl:
+            self.kafka.start_minikdc(self.zk.zk_principals)
+        # restart zk
+        for node in self.zk.nodes:
+            self.zk.stop_node(node)
+            self.zk.start_node(node)
+
+        # restart broker with jaas login
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            self.kafka.start_node(node)
+
+        # run migration tool
+        for node in self.zk.nodes:
+            self.zk.zookeeper_migration(node, "secure")
+
+        # restart broker with zookeeper.set.acl=true and acls
+        self.kafka.zk_set_acl = True
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            self.kafka.start_node(node)
+
+    @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"])
+    def test_zk_security_upgrade(self, security_protocol):
+        self.zk.start()
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+
+        # set acls
+        if self.is_secure:
+            self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
+            self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
+
+        if(self.no_sasl):
+            self.kafka.start()
+        else:
+            self.kafka.start(self.zk.zk_principals)
+
+        #Create Producer and Consumer
+        self.create_producer_and_consumer()
+
+        #Run upgrade
+        self.run_produce_consume_validate(self.run_zk_migration)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/security2/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security2/__init__.py b/tests/kafkatest/tests/security2/__init__.py
new file mode 100644
index 0000000..e69de29


[3/4] kafka git commit: KAFKA-4345; Run decktape test for each pull request

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client2/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client2/consumer_test.py b/tests/kafkatest/tests/client2/consumer_test.py
new file mode 100644
index 0000000..534f65c
--- /dev/null
+++ b/tests/kafkatest/tests/client2/consumer_test.py
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+import signal
+
+class OffsetValidationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 1
+
+    def __init__(self, test_context):
+        super(OffsetValidationTest, self).__init__(test_context, num_consumers=3, num_producers=1,
+                                                     num_zk=1, num_brokers=2, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
+        })
+
+    def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in consumer.nodes:
+                consumer.stop_node(node, clean_shutdown)
+
+                wait_until(lambda: len(consumer.dead_nodes()) == 1,
+                           timeout_sec=self.session_timeout_sec+5,
+                           err_msg="Timed out waiting for the consumer to shutdown")
+
+                consumer.start_node(node)
+
+                self.await_all_members(consumer)
+                self.await_consumed_messages(consumer)
+
+    def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in consumer.nodes:
+                consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+                       err_msg="Timed out waiting for the consumers to shutdown")
+            
+            for node in consumer.nodes:
+                consumer.start_node(node)
+
+            self.await_all_members(consumer)
+            self.await_consumed_messages(consumer)
+
+    def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in self.kafka.nodes:
+                self.kafka.restart_node(node, clean_shutdown=True)
+                self.await_all_members(consumer)
+                self.await_consumed_messages(consumer)
+
+    def setup_consumer(self, topic, **kwargs):
+        # collect verifiable consumer events since this makes debugging much easier
+        consumer = super(OffsetValidationTest, self).setup_consumer(topic, **kwargs)
+        self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
+        return consumer
+
+    def test_broker_rolling_bounce(self):
+        """
+        Verify correct consumer behavior when the brokers are consecutively restarted.
+
+        Setup: single Kafka cluster with one producer writing messages to a single topic with one
+        partition, an a set of consumers in the same group reading from the same topic.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers and wait until they've joined the group.
+        - In a loop, restart each broker consecutively, waiting for the group to stabilize between
+          each broker restart.
+        - Verify delivery semantics according to the failure type and that the broker bounces
+          did not cause unexpected group rebalances.
+        """
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        producer = self.setup_producer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC)
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        num_rebalances = consumer.num_rebalances()
+        # TODO: make this test work with hard shutdowns, which probably requires
+        #       pausing before the node is restarted to ensure that any ephemeral
+        #       nodes have time to expire
+        self.rolling_bounce_brokers(consumer, clean_shutdown=True)
+        
+        unexpected_rebalances = consumer.num_rebalances() - num_rebalances
+        assert unexpected_rebalances == 0, \
+            "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
+
+        consumer.stop_all()
+
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+    @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
+    def test_consumer_bounce(self, clean_shutdown, bounce_mode):
+        """
+        Verify correct consumer behavior when the consumers in the group are consecutively restarted.
+
+        Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers and wait until they've joined the group.
+        - In a loop, restart each consumer, waiting for each one to rejoin the group before
+          restarting the rest.
+        - Verify delivery semantics according to the failure type.
+        """
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        producer = self.setup_producer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC)
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        if bounce_mode == "all":
+            self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
+        else:
+            self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
+                
+        consumer.stop_all()
+        if clean_shutdown:
+            # if the total records consumed matches the current position, we haven't seen any duplicates
+            # this can only be guaranteed with a clean shutdown
+            assert consumer.current_position(partition) == consumer.total_consumed(), \
+                "Total consumed records did not match consumed position"
+        else:
+            # we may have duplicates in a hard failure
+            assert consumer.current_position(partition) <= consumer.total_consumed(), \
+                "Current position greater than the total number of consumed records"
+
+    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+    def test_consumer_failure(self, clean_shutdown, enable_autocommit):
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+        producer = self.setup_producer(self.TOPIC)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        partition_owner = consumer.owner(partition)
+        assert partition_owner is not None
+
+        # startup the producer and ensure that some records have been written
+        producer.start()
+        self.await_produced_messages(producer)
+
+        # stop the partition owner and await its shutdown
+        consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
+        wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
+                   timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for consumer to close")
+
+        # ensure that the remaining consumer does some work after rebalancing
+        self.await_consumed_messages(consumer, min_messages=1000)
+
+        consumer.stop_all()
+
+        if clean_shutdown:
+            # if the total records consumed matches the current position, we haven't seen any duplicates
+            # this can only be guaranteed with a clean shutdown
+            assert consumer.current_position(partition) == consumer.total_consumed(), \
+                "Total consumed records did not match consumed position"
+        else:
+            # we may have duplicates in a hard failure
+            assert consumer.current_position(partition) <= consumer.total_consumed(), \
+                "Current position greater than the total number of consumed records"
+
+        # if autocommit is not turned on, we can also verify the last committed offset
+        if not enable_autocommit:
+            assert consumer.last_commit(partition) == consumer.current_position(partition), \
+                "Last committed offset did not match last consumed position"
+
+
+    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+    def test_broker_failure(self, clean_shutdown, enable_autocommit):
+        partition = TopicPartition(self.TOPIC, 0)
+        
+        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+        producer = self.setup_producer(self.TOPIC)
+
+        producer.start()
+        consumer.start()
+        self.await_all_members(consumer)
+
+        num_rebalances = consumer.num_rebalances()
+
+        # shutdown one of the brokers
+        # TODO: we need a way to target the coordinator instead of picking arbitrarily
+        self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
+
+        # ensure that the consumers do some work after the broker failure
+        self.await_consumed_messages(consumer, min_messages=1000)
+
+        # verify that there were no rebalances on failover
+        assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
+
+        consumer.stop_all()
+
+        # if the total records consumed matches the current position, we haven't seen any duplicates
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+        # if autocommit is not turned on, we can also verify the last committed offset
+        if not enable_autocommit:
+            assert consumer.last_commit(partition) == consumer.current_position(partition), \
+                "Last committed offset did not match last consumed position"
+
+    def test_group_consumption(self):
+        """
+        Verifies correct group rebalance behavior as consumers are started and stopped. 
+        In particular, this test verifies that the partition is readable after every
+        expected rebalance.
+
+        Setup: single Kafka cluster with a group of consumers reading from one topic
+        with one partition while the verifiable producer writes to it.
+
+        - Start the consumers one by one, verifying consumption after each rebalance
+        - Shutdown the consumers one by one, verifying consumption after each rebalance
+        """
+        consumer = self.setup_consumer(self.TOPIC)
+        producer = self.setup_producer(self.TOPIC)
+
+        partition = TopicPartition(self.TOPIC, 0)
+
+        producer.start()
+
+        for num_started, node in enumerate(consumer.nodes, 1):
+            consumer.start_node(node)
+            self.await_members(consumer, num_started)
+            self.await_consumed_messages(consumer)
+
+        for num_stopped, node in enumerate(consumer.nodes, 1):
+            consumer.stop_node(node)
+
+            if num_stopped < self.num_consumers:
+                self.await_members(consumer, self.num_consumers - num_stopped)
+                self.await_consumed_messages(consumer)
+
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+        assert consumer.last_commit(partition) == consumer.current_position(partition), \
+            "Last committed offset did not match last consumed position"
+
+
+class AssignmentValidationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    def __init__(self, test_context):
+        super(AssignmentValidationTest, self).__init__(test_context, num_consumers=3, num_producers=0,
+                                                num_zk=1, num_brokers=2, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
+        })
+
+    @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+                                 "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
+    def test_valid_assignment(self, assignment_strategy):
+        """
+        Verify assignment strategy correctness: each partition is assigned to exactly
+        one consumer instance.
+
+        Setup: single Kafka cluster with a set of consumers in the same group.
+
+        - Start the consumers one by one
+        - Validate assignment after every expected rebalance
+        """
+        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy)
+        for num_started, node in enumerate(consumer.nodes, 1):
+            consumer.start_node(node)
+            self.await_members(consumer, num_started)
+            assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
+            

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/__init__.py b/tests/kafkatest/tests/core/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/tests/kafkatest/tests/core/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
deleted file mode 100644
index d6a0a12..0000000
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka import config_property
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
-
-
-# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
-class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.messages_per_producer = 1000
-
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
-    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
-       
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}})
-        for node in self.kafka.nodes:
-            if timestamp_type is not None:
-                node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
-        self.kafka.start()
-         
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           compression_types=compression_types,
-                                           version=KafkaVersion(producer_version))
-
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
-                                        message_validator=is_int, version=KafkaVersion(consumer_version))
-
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/consumer_group_command_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py
deleted file mode 100644
index c3f59d9..0000000
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-
-import os
-import re
-
-TOPIC = "topic-consumer-group-command"
-
-class ConsumerGroupCommandTest(Test):
-    """
-    Tests ConsumerGroupCommand
-    """
-    # Root directory for persistent output
-    PERSISTENT_ROOT = "/mnt/consumer_group_command"
-    COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
-
-    def __init__(self, test_context):
-        super(ConsumerGroupCommandTest, self).__init__(test_context)
-        self.num_zk = 1
-        self.num_brokers = 1
-        self.topics = {
-            TOPIC: {'partitions': 1, 'replication-factor': 1}
-        }
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
-    def setUp(self):
-        self.zk.start()
-
-    def start_kafka(self, security_protocol, interbroker_security_protocol):
-        self.kafka = KafkaService(
-            self.test_context, self.num_brokers,
-            self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
-        self.kafka.start()
-
-    def start_consumer(self, security_protocol):
-        enable_new_consumer = security_protocol == SecurityConfig.SSL
-        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
-                                        consumer_timeout_ms=None, new_consumer=enable_new_consumer)
-        self.consumer.start()
-
-    def setup_and_verify(self, security_protocol, group=None):
-        self.start_kafka(security_protocol, security_protocol)
-        self.start_consumer(security_protocol)
-        consumer_node = self.consumer.nodes[0]
-        wait_until(lambda: self.consumer.alive(consumer_node),
-                   timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
-        kafka_node = self.kafka.nodes[0]
-        if security_protocol is not SecurityConfig.PLAINTEXT:
-            prop_file = str(self.kafka.security_config.client_config())
-            self.logger.debug(prop_file)
-            kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
-            kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)
-
-        # Verify ConsumerGroupCommand lists expected consumer groups
-        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
-        command_config_file = None
-        if enable_new_consumer:
-            command_config_file = self.COMMAND_CONFIG_FILE
-
-        if group:
-            wait_until(lambda: re.search("topic-consumer-group-command",self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file)), timeout_sec=10,
-                       err_msg="Timed out waiting to list expected consumer groups.")
-        else:
-            wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
-                       err_msg="Timed out waiting to list expected consumer groups.")
-
-        self.consumer.stop()
-
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
-        """
-        Tests if ConsumerGroupCommand is listing correct consumer groups
-        :return: None
-        """
-        self.setup_and_verify(security_protocol)
-
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
-        """
-        Tests if ConsumerGroupCommand is describing a consumer group correctly
-        :return: None
-        """
-        self.setup_and_verify(security_protocol, group="test-consumer-group")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/get_offset_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py
deleted file mode 100644
index 38bd9dc..0000000
--- a/tests/kafkatest/tests/core/get_offset_shell_test.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from kafkatest.services.verifiable_producer import VerifiableProducer
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-
-TOPIC = "topic-get-offset-shell"
-MAX_MESSAGES = 100
-NUM_PARTITIONS = 1
-REPLICATION_FACTOR = 1
-
-class GetOffsetShellTest(Test):
-    """
-    Tests GetOffsetShell tool
-    """
-    def __init__(self, test_context):
-        super(GetOffsetShellTest, self).__init__(test_context)
-        self.num_zk = 1
-        self.num_brokers = 1
-        self.messages_received_count = 0
-        self.topics = {
-            TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
-        }
-
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
-
-
-    def setUp(self):
-        self.zk.start()
-
-    def start_kafka(self, security_protocol, interbroker_security_protocol):
-        self.kafka = KafkaService(
-            self.test_context, self.num_brokers,
-            self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
-        self.kafka.start()
-
-    def start_producer(self):
-        # This will produce to kafka cluster
-        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
-        self.producer.start()
-        current_acked = self.producer.num_acked
-        wait_until(lambda: self.producer.num_acked >= current_acked + MAX_MESSAGES, timeout_sec=10,
-                   err_msg="Timeout awaiting messages to be produced and acked")
-
-    def start_consumer(self, security_protocol):
-        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
-        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
-                                        consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
-        self.consumer.start()
-
-    def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
-        """
-        Tests if GetOffsetShell is getting offsets correctly
-        :return: None
-        """
-        self.start_kafka(security_protocol, security_protocol)
-        self.start_producer()
-
-        # Assert that offset fetched without any consumers consuming is 0
-        assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0)
-
-        self.start_consumer(security_protocol)
-
-        node = self.consumer.nodes[0]
-
-        wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
-
-        # Assert that offset is correctly indicated by GetOffsetShell tool
-        wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), timeout_sec=10,
-                   err_msg="Timed out waiting to reach expected offset.")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/mirror_maker_test.py b/tests/kafkatest/tests/core/mirror_maker_test.py
deleted file mode 100644
index afb1972..0000000
--- a/tests/kafkatest/tests/core/mirror_maker_test.py
+++ /dev/null
@@ -1,179 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize, matrix, ignore
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.mirror_maker import MirrorMaker
-from kafkatest.services.security.minikdc import MiniKdc
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-
-import time
-
-
-class TestMirrorMakerService(ProduceConsumeValidateTest):
-    """Sanity checks on mirror maker service class."""
-    def __init__(self, test_context):
-        super(TestMirrorMakerService, self).__init__(test_context)
-
-        self.topic = "topic"
-        self.source_zk = ZookeeperService(test_context, num_nodes=1)
-        self.target_zk = ZookeeperService(test_context, num_nodes=1)
-
-        self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
-                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
-        self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
-                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
-        # This will produce to source kafka cluster
-        self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
-                                           throughput=1000)
-        self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
-                                        whitelist=self.topic, offset_commit_interval_ms=1000)
-        # This will consume from target kafka cluster
-        self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
-                                        message_validator=is_int, consumer_timeout_ms=60000)
-
-    def setUp(self):
-        # Source cluster
-        self.source_zk.start()
-
-        # Target cluster
-        self.target_zk.start()
-
-    def start_kafka(self, security_protocol):
-        self.source_kafka.security_protocol = security_protocol
-        self.source_kafka.interbroker_security_protocol = security_protocol
-        self.target_kafka.security_protocol = security_protocol
-        self.target_kafka.interbroker_security_protocol = security_protocol
-        if self.source_kafka.security_config.has_sasl_kerberos:
-            minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes)
-            self.source_kafka.minikdc = minikdc
-            self.target_kafka.minikdc = minikdc
-            minikdc.start()
-        self.source_kafka.start()
-        self.target_kafka.start()
-
-    def bounce(self, clean_shutdown=True):
-        """Bounce mirror maker with a clean (kill -15) or hard (kill -9) shutdown"""
-
-        # Wait until messages start appearing in the target cluster
-        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=15)
-
-        # Wait for at least one offset to be committed.
-        #
-        # This step is necessary to prevent data loss with default mirror maker settings:
-        # currently, if we don't have at least one committed offset,
-        # and we bounce mirror maker, the consumer internals will throw OffsetOutOfRangeException, and the default
-        # auto.offset.reset policy ("largest") will kick in, causing mirrormaker to start consuming from the largest
-        # offset. As a result, any messages produced to the source cluster while mirrormaker was dead won't get
-        # mirrored to the target cluster.
-        # (see https://issues.apache.org/jira/browse/KAFKA-2759)
-        #
-        # This isn't necessary with kill -15 because mirror maker commits its offsets during graceful
-        # shutdown.
-        if not clean_shutdown:
-            time.sleep(self.mirror_maker.offset_commit_interval_ms / 1000.0 + .5)
-
-        for i in range(3):
-            self.logger.info("Bringing mirror maker nodes down...")
-            for node in self.mirror_maker.nodes:
-                self.mirror_maker.stop_node(node, clean_shutdown=clean_shutdown)
-
-            num_consumed = len(self.consumer.messages_consumed[1])
-            self.logger.info("Bringing mirror maker nodes back up...")
-            for node in self.mirror_maker.nodes:
-                self.mirror_maker.start_node(node)
-
-            # Ensure new messages are once again showing up on the target cluster
-            # new consumer requires higher timeout here
-            wait_until(lambda: len(self.consumer.messages_consumed[1]) > num_consumed + 100, timeout_sec=60)
-
-    def wait_for_n_messages(self, n_messages=100):
-        """Wait for a minimum number of messages to be successfully produced."""
-        wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
-                     err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
-
-    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
-    def test_simple_end_to_end(self, security_protocol, new_consumer):
-        """
-        Test end-to-end behavior under non-failure conditions.
-
-        Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
-        One is source, and the other is target. Single-node mirror maker mirrors from source to target.
-
-        - Start mirror maker.
-        - Produce a small number of messages to the source cluster.
-        - Consume messages from target.
-        - Verify that number of consumed messages matches the number produced.
-        """
-        self.start_kafka(security_protocol)
-        self.consumer.new_consumer = new_consumer
-
-        self.mirror_maker.new_consumer = new_consumer
-        self.mirror_maker.start()
-
-        mm_node = self.mirror_maker.nodes[0]
-        with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
-            if new_consumer:
-                monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-            else:
-                monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-
-        self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages)
-        self.mirror_maker.stop()
-
-    @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
-    @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
-    def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
-        """
-        Test end-to-end behavior under failure conditions.
-
-        Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
-        One is source, and the other is target. Single-node mirror maker mirrors from source to target.
-
-        - Start mirror maker.
-        - Produce to source cluster, and consume from target cluster in the background.
-        - Bounce MM process
-        - Verify every message acknowledged by the source producer is consumed by the target consumer
-        """
-        if new_consumer and not clean_shutdown:
-            # Increase timeout on downstream console consumer; mirror maker with new consumer takes extra time
-            # during hard bounce. This is because the restarted mirror maker consumer won't be able to rejoin
-            # the group until the previous session times out
-            self.consumer.consumer_timeout_ms = 60000
-
-        self.start_kafka(security_protocol)
-        self.consumer.new_consumer = new_consumer
-
-        self.mirror_maker.offsets_storage = offsets_storage
-        self.mirror_maker.new_consumer = new_consumer
-        self.mirror_maker.start()
-
-        # Wait until mirror maker has reset fetch offset at least once before continuing with the rest of the test
-        mm_node = self.mirror_maker.nodes[0]
-        with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
-            if new_consumer:
-                monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-            else:
-                monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-
-        self.run_produce_consume_validate(core_test_action=lambda: self.bounce(clean_shutdown=clean_shutdown))
-        self.mirror_maker.stop()

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py
deleted file mode 100644
index 850e2aa..0000000
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ /dev/null
@@ -1,110 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-import random
-
-class ReassignPartitionsTest(ProduceConsumeValidateTest):
-    """
-    These tests validate partition reassignment.
-    Create a topic with few partitions, load some data, trigger partition re-assignment with and without broker failure,
-    check that partition re-assignment can complete and there is no data loss.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(ReassignPartitionsTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 20,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}
-                                                                })
-        self.num_partitions = 20
-        self.timeout_sec = 60
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        # Override this since we're adding services outside of the constructor
-        return super(ReassignPartitionsTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    def clean_bounce_some_brokers(self):
-        """Bounce every other broker"""
-        for node in self.kafka.nodes[::2]:
-            self.kafka.restart_node(node, clean_shutdown=True)
-
-    def reassign_partitions(self, bounce_brokers):
-        partition_info = self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
-        self.logger.debug("Partitions before reassignment:" + str(partition_info))
-
-        # jumble partition assignment in dictionary
-        seed = random.randint(0, 2 ** 31 - 1)
-        self.logger.debug("Jumble partition assignment with seed " + str(seed))
-        random.seed(seed)
-        # The list may still be in order, but that's ok
-        shuffled_list = range(0, self.num_partitions)
-        random.shuffle(shuffled_list)
-
-        for i in range(0, self.num_partitions):
-            partition_info["partitions"][i]["partition"] = shuffled_list[i]
-        self.logger.debug("Jumbled partitions: " + str(partition_info))
-
-        # send reassign partitions command
-        self.kafka.execute_reassign_partitions(partition_info)
-
-        if bounce_brokers:
-            # bounce a few brokers at the same time
-            self.clean_bounce_some_brokers()
-
-        # Wait until finished or timeout
-        wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5)
-
-    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
-    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
-    def test_reassign_partitions(self, bounce_brokers, security_protocol):
-        """Reassign partitions tests.
-        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
-
-            - Produce messages in the background
-            - Consume messages in the background
-            - Reassign partitions
-            - If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress
-            - When done reassigning partitions and bouncing brokers, stop producing, and finish consuming
-            - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
-        self.kafka.start()
-
-        self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
deleted file mode 100644
index f815034..0000000
--- a/tests/kafkatest/tests/core/replication_test.py
+++ /dev/null
@@ -1,154 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-
-import signal
-
-def broker_node(test, broker_type):
-    """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
-    """
-    if broker_type == "leader":
-        node = test.kafka.leader(test.topic, partition=0)
-    elif broker_type == "controller":
-        node = test.kafka.controller()
-    else:
-        raise Exception("Unexpected broker type %s." % (broker_type))
-
-    return node
-
-def clean_shutdown(test, broker_type):
-    """Discover broker node of requested type and shut it down cleanly.
-    """
-    node = broker_node(test, broker_type)
-    test.kafka.signal_node(node, sig=signal.SIGTERM)
-
-
-def hard_shutdown(test, broker_type):
-    """Discover broker node of requested type and shut it down with a hard kill."""
-    node = broker_node(test, broker_type)
-    test.kafka.signal_node(node, sig=signal.SIGKILL)
-
-
-def clean_bounce(test, broker_type):
-    """Chase the leader of one partition and restart it cleanly."""
-    for i in range(5):
-        prev_broker_node = broker_node(test, broker_type)
-        test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
-
-
-def hard_bounce(test, broker_type):
-    """Chase the leader and restart it with a hard kill."""
-    for i in range(5):
-        prev_broker_node = broker_node(test, broker_type)
-        test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
-
-        # Since this is a hard kill, we need to make sure the process is down and that
-        # zookeeper has registered the loss by expiring the broker's session timeout.
-
-        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
-                   timeout_sec=test.kafka.zk_session_timeout + 5,
-                   err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
-
-        test.kafka.start_node(prev_broker_node)
-
-failures = {
-    "clean_shutdown": clean_shutdown,
-    "hard_shutdown": hard_shutdown,
-    "clean_bounce": clean_bounce,
-    "hard_bounce": hard_bounce
-}
-
-
-class ReplicationTest(ProduceConsumeValidateTest):
-    """
-    Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
-    (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
-    too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
-    ordering guarantees.
-
-    Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
-    we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
-
-    Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
-    consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
-    indicator that nothing is left to consume.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(ReplicationTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}
-                                                                })
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        """Override this since we're adding services outside of the constructor"""
-        return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-
-    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            broker_type=["leader"],
-            security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
-    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            broker_type=["controller"],
-            security_protocol=["PLAINTEXT", "SASL_SSL"])
-    @matrix(failure_mode=["hard_bounce"],
-            broker_type=["leader"],
-            security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"])
-    def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"):
-        """Replication tests.
-        These tests verify that replication provides simple durability guarantees by checking that data acked by
-        brokers is still available for consumption in the face of various failure scenarios.
-
-        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
-
-            - Produce messages in the background
-            - Consume messages in the background
-            - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
-            - When done driving failures, stop producing, and finish consuming
-            - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        self.kafka.client_sasl_mechanism = client_sasl_mechanism
-        self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
-        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
-        self.kafka.start()
-        
-        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
deleted file mode 100644
index 51b2e60..0000000
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ /dev/null
@@ -1,190 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from ducktape.mark import parametrize
-from ducktape.mark import matrix
-from kafkatest.services.security.kafka_acls import ACLs
-import time
-
-
-class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
-    """Tests a rolling upgrade from PLAINTEXT to a secured cluster
-    """
-
-    def __init__(self, test_context):
-        super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.acls = ACLs(self.test_context)
-        self.topic = "test_topic"
-        self.group = "group"
-        self.producer_throughput = 100
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-            "partitions": 3,
-            "replication-factor": 3,
-            'configs': {"min.insync.replicas": 2}}})
-        self.zk.start()
-
-    def create_producer_and_consumer(self):
-        self.producer = VerifiableProducer(
-            self.test_context, self.num_producers, self.kafka, self.topic,
-            throughput=self.producer_throughput)
-
-        self.consumer = ConsoleConsumer(
-            self.test_context, self.num_consumers, self.kafka, self.topic,
-            consumer_timeout_ms=60000, message_validator=is_int)
-
-        self.consumer.group_id = "group"
-
-    def bounce(self):
-        self.kafka.start_minikdc()
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            self.kafka.start_node(node)
-            time.sleep(10)
-
-    def roll_in_secured_settings(self, client_protocol, broker_protocol):
-
-        # Roll cluster to include inter broker security protocol.
-        self.kafka.interbroker_security_protocol = broker_protocol
-        self.kafka.open_port(client_protocol)
-        self.kafka.open_port(broker_protocol)
-        self.bounce()
-
-        # Roll cluster to disable PLAINTEXT port
-        self.kafka.close_port('PLAINTEXT')
-        self.set_authorizer_and_bounce(client_protocol, broker_protocol)
-
-    def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
-        self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
-        self.acls.set_acls(client_protocol, self.kafka, self.zk, self.topic, self.group)
-        self.acls.set_acls(broker_protocol, self.kafka, self.zk, self.topic, self.group)
-        self.bounce()
-
-    def open_secured_port(self, client_protocol):
-        self.kafka.security_protocol = client_protocol
-        self.kafka.open_port(client_protocol)
-        self.kafka.start_minikdc()
-        self.bounce()
-
-    def add_sasl_mechanism(self, new_client_sasl_mechanism):
-        self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
-        self.kafka.start_minikdc()
-        self.bounce()
-
-    def roll_in_sasl_mechanism(self, security_protocol, new_sasl_mechanism):
-        # Roll cluster to update inter-broker SASL mechanism. This disables the old mechanism.
-        self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
-        self.bounce()
-
-        # Bounce again with ACLs for new mechanism
-        self.set_authorizer_and_bounce(security_protocol, security_protocol)
-
-    @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
-    def test_rolling_upgrade_phase_one(self, client_protocol):
-        """
-        Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
-        and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
-        """
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
-        self.kafka.security_protocol = "PLAINTEXT"
-        self.kafka.start()
-
-        # Create PLAINTEXT producer and consumer
-        self.create_producer_and_consumer()
-
-        # Rolling upgrade, opening a secure protocol, ensuring the Plaintext producer/consumer continues to run
-        self.run_produce_consume_validate(self.open_secured_port, client_protocol)
-
-        # Now we can produce and consume via the secured port
-        self.kafka.security_protocol = client_protocol
-        self.create_producer_and_consumer()
-        self.run_produce_consume_validate(lambda: time.sleep(1))
-
-    @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
-    def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
-        """
-        Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
-        Start an Producer and Consumer via the SECURED port
-        Incrementally upgrade to add inter-broker be the secure protocol
-        Incrementally upgrade again to add ACLs as well as disabling the PLAINTEXT port
-        Ensure the producer and consumer ran throughout
-        """
-        #Given we have a broker that has both secure and PLAINTEXT ports open
-        self.kafka.security_protocol = client_protocol
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
-        self.kafka.start()
-
-        #Create Secured Producer and Consumer
-        self.create_producer_and_consumer()
-
-        #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
-        self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
-
-    @parametrize(new_client_sasl_mechanism='PLAIN')
-    def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
-        """
-        Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce
-        and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism.
-        """
-        self.kafka.interbroker_security_protocol = "SASL_SSL"
-        self.kafka.security_protocol = "SASL_SSL"
-        self.kafka.client_sasl_mechanism = "GSSAPI"
-        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
-        self.kafka.start()
-
-        # Create SASL/GSSAPI producer and consumer
-        self.create_producer_and_consumer()
-
-        # Rolling upgrade, adding new SASL mechanism, ensuring the GSSAPI producer/consumer continues to run
-        self.run_produce_consume_validate(self.add_sasl_mechanism, new_client_sasl_mechanism)
-
-        # Now we can produce and consume using the new SASL mechanism
-        self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
-        self.create_producer_and_consumer()
-        self.run_produce_consume_validate(lambda: time.sleep(1))
-
-    @parametrize(new_sasl_mechanism='PLAIN')
-    def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
-        """
-        Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one).
-        Start Producer and Consumer using the second mechanism
-        Incrementally upgrade to set inter-broker to the second mechanism and disable GSSAPI
-        Incrementally upgrade again to add ACLs
-        Ensure the producer and consumer run throughout
-        """
-        #Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients
-        self.kafka.security_protocol = "SASL_SSL"
-        self.kafka.interbroker_security_protocol = "SASL_SSL"
-        self.kafka.client_sasl_mechanism = new_sasl_mechanism
-        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
-        self.kafka.start()
-
-        #Create Producer and Consumer using second mechanism
-        self.create_producer_and_consumer()
-
-        #Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout
-        self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism)
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/security_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
deleted file mode 100644
index b6bc656..0000000
--- a/tests/kafkatest/tests/core/security_test.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.security.security_config import SslStores
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-import time
-
-class TestSslStores(SslStores):
-    def __init__(self):
-        super(TestSslStores, self).__init__()
-        self.invalid_hostname = False
-        self.generate_ca()
-        self.generate_truststore()
-
-    def hostname(self, node):
-        if (self.invalid_hostname):
-            return "invalidhost"
-        else:
-            return super(TestSslStores, self).hostname(node)
-
-class SecurityTest(ProduceConsumeValidateTest):
-    """
-    These tests validate security features.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(SecurityTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 2,
-                                                                    "replication-factor": 1}
-                                                                })
-        self.num_partitions = 2
-        self.timeout_sec = 10000
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
-        """
-        Test that invalid hostname in certificate results in connection failures.
-        When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
-        When security_protocol=PLAINTEXT and interbroker_security_protocol=SSL, controller connections fail
-        with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
-        """
-
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = interbroker_security_protocol
-        SecurityConfig.ssl_stores = TestSslStores()
-
-        SecurityConfig.ssl_stores.invalid_hostname = True
-        self.kafka.start()
-        self.create_producer_and_consumer()
-        self.producer.log_level = "TRACE"
-        self.producer.start()
-        self.consumer.start()
-        time.sleep(10)
-        assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname"
-        error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE'
-        for node in self.producer.nodes:
-            node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
-        for node in self.consumer.nodes:
-            node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
-
-        self.producer.stop()
-        self.consumer.stop()
-        self.producer.log_level = "INFO"
-
-        SecurityConfig.ssl_stores.invalid_hostname = False
-        for node in self.kafka.nodes:
-            self.kafka.restart_node(node, clean_shutdown=True)
-        self.create_producer_and_consumer()
-        self.run_produce_consume_validate()
-
-    def create_producer_and_consumer(self):
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=10000, message_validator=is_int)
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/simple_consumer_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/simple_consumer_shell_test.py b/tests/kafkatest/tests/core/simple_consumer_shell_test.py
deleted file mode 100644
index 74a7eeb..0000000
--- a/tests/kafkatest/tests/core/simple_consumer_shell_test.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from kafkatest.services.simple_consumer_shell import SimpleConsumerShell
-from kafkatest.services.verifiable_producer import VerifiableProducer
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-TOPIC = "topic-simple-consumer-shell"
-MAX_MESSAGES = 100
-NUM_PARTITIONS = 1
-REPLICATION_FACTOR = 1
-
-class SimpleConsumerShellTest(Test):
-    """
-    Tests SimpleConsumerShell tool
-    """
-    def __init__(self, test_context):
-        super(SimpleConsumerShellTest, self).__init__(test_context)
-        self.num_zk = 1
-        self.num_brokers = 1
-        self.messages_received_count = 0
-        self.topics = {
-            TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
-        }
-
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
-    def setUp(self):
-        self.zk.start()
-
-    def start_kafka(self):
-        self.kafka = KafkaService(
-            self.test_context, self.num_brokers,
-            self.zk, topics=self.topics)
-        self.kafka.start()
-
-    def run_producer(self):
-        # This will produce to kafka cluster
-        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
-        self.producer.start()
-        wait_until(lambda: self.producer.num_acked == MAX_MESSAGES, timeout_sec=10,
-                   err_msg="Timeout awaiting messages to be produced and acked")
-
-    def start_simple_consumer_shell(self):
-        self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC)
-        self.simple_consumer_shell.start()
-
-    def test_simple_consumer_shell(self):
-        """
-        Tests if SimpleConsumerShell is fetching expected records
-        :return: None
-        """
-        self.start_kafka()
-        self.run_producer()
-        self.start_simple_consumer_shell()
-
-        # Assert that SimpleConsumerShell is fetching expected number of messages
-        wait_until(lambda: self.simple_consumer_shell.get_output().count("\n") == (MAX_MESSAGES + 1), timeout_sec=10,
-                   err_msg="Timed out waiting to receive expected number of messages.")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
deleted file mode 100644
index 2e21322..0000000
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-import math
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.utils import is_int
-
-
-class ThrottlingTest(ProduceConsumeValidateTest):
-    """Tests throttled partition reassignment. This is essentially similar
-    to the reassign_partitions_test, except that we throttle the reassignment
-    and verify that it takes a sensible amount of time given the throttle
-    and the amount of data being moved.
-
-    Since the correctness is time dependent, this test also simplifies the
-    cluster topology. In particular, we fix the number of brokers, the
-    replication-factor, the number of partitions, the partition size, and
-    the number of partitions being moved so that we can accurately predict
-    the time throttled reassignment should take.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(ThrottlingTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        # Because we are starting the producer/consumer/validate cycle _after_
-        # seeding the cluster with big data (to test throttling), we need to
-        # Start the consumer from the end of the stream. further, we need to
-        # ensure that the consumer is fully started before the producer starts
-        # so that we don't miss any messages. This timeout ensures the sufficient
-        # condition.
-        self.consumer_init_timeout_sec =  10
-        self.num_brokers = 6
-        self.num_partitions = 3
-        self.kafka = KafkaService(test_context,
-                                  num_nodes=self.num_brokers,
-                                  zk=self.zk,
-                                  topics={
-                                      self.topic: {
-                                          "partitions": self.num_partitions,
-                                          "replication-factor": 2,
-                                          "configs": {
-                                              "segment.bytes": 64 * 1024 * 1024
-                                          }
-                                      }
-                                  })
-        self.producer_throughput = 1000
-        self.timeout_sec = 400
-        self.num_records = 2000
-        self.record_size = 4096 * 100  # 400 KB
-        # 1 MB per partition on average.
-        self.partition_size = (self.num_records * self.record_size) / self.num_partitions
-        self.num_producers = 2
-        self.num_consumers = 1
-        self.throttle = 4 * 1024 * 1024  # 4 MB/s
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        # Override this since we're adding services outside of the constructor
-        return super(ThrottlingTest, self).min_cluster_size() +\
-            self.num_producers + self.num_consumers
-
-    def clean_bounce_some_brokers(self):
-        """Bounce every other broker"""
-        for node in self.kafka.nodes[::2]:
-            self.kafka.restart_node(node, clean_shutdown=True)
-
-    def reassign_partitions(self, bounce_brokers, throttle):
-        """This method reassigns partitions using a throttle. It makes an
-        assertion about the minimum amount of time the reassignment should take
-        given the value of the throttle, the number of partitions being moved,
-        and the size of each partition.
-        """
-        partition_info = self.kafka.parse_describe_topic(
-            self.kafka.describe_topic(self.topic))
-        self.logger.debug("Partitions before reassignment:" +
-                          str(partition_info))
-        max_num_moves = 0
-        for i in range(0, self.num_partitions):
-            old_replicas = set(partition_info["partitions"][i]["replicas"])
-            new_part = (i+1) % self.num_partitions
-            new_replicas = set(partition_info["partitions"][new_part]["replicas"])
-            max_num_moves = max(len(new_replicas - old_replicas), max_num_moves)
-            partition_info["partitions"][i]["partition"] = new_part
-        self.logger.debug("Jumbled partitions: " + str(partition_info))
-
-        self.kafka.execute_reassign_partitions(partition_info,
-                                               throttle=throttle)
-        start = time.time()
-        if bounce_brokers:
-            # bounce a few brokers at the same time
-            self.clean_bounce_some_brokers()
-
-        # Wait until finished or timeout
-        size_per_broker = max_num_moves * self.partition_size
-        self.logger.debug("Max amount of data transfer per broker: %fb",
-                          size_per_broker)
-        estimated_throttled_time = math.ceil(float(size_per_broker) /
-                                             self.throttle)
-        estimated_time_with_buffer = estimated_throttled_time * 2
-        self.logger.debug("Waiting %ds for the reassignment to complete",
-                          estimated_time_with_buffer)
-        wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info),
-                   timeout_sec=estimated_time_with_buffer, backoff_sec=.5)
-        stop = time.time()
-        time_taken = stop - start
-        self.logger.debug("Transfer took %d second. Estimated time : %ds",
-                          time_taken,
-                          estimated_throttled_time)
-        assert time_taken >= estimated_throttled_time, \
-            ("Expected rebalance to take at least %ds, but it took %ds" % (
-                estimated_throttled_time,
-                time_taken))
-
-    @parametrize(bounce_brokers=False)
-    @parametrize(bounce_brokers=True)
-    def test_throttled_reassignment(self, bounce_brokers):
-        security_protocol = 'PLAINTEXT'
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-
-        producer_id = 'bulk_producer'
-        bulk_producer = ProducerPerformanceService(
-            context=self.test_context, num_nodes=1, kafka=self.kafka,
-            topic=self.topic, num_records=self.num_records,
-            record_size=self.record_size, throughput=-1, client_id=producer_id,
-            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
-            jmx_attributes=['outgoing-byte-rate'])
-
-
-        self.producer = VerifiableProducer(context=self.test_context,
-                                           num_nodes=1,
-                                           kafka=self.kafka, topic=self.topic,
-                                           message_validator=is_int,
-                                           throughput=self.producer_throughput)
-
-        self.consumer = ConsoleConsumer(self.test_context,
-                                        self.num_consumers,
-                                        self.kafka,
-                                        self.topic,
-                                        consumer_timeout_ms=60000,
-                                        message_validator=is_int,
-                                        from_beginning=False)
-
-        self.kafka.start()
-        bulk_producer.run()
-        self.run_produce_consume_validate(core_test_action=
-                                          lambda: self.reassign_partitions(bounce_brokers, self.throttle))


[4/4] kafka git commit: KAFKA-4345; Run decktape test for each pull request

Posted by sr...@apache.org.
KAFKA-4345; Run decktape test for each pull request

As of now the ducktape tests that we have for kafka are not run for pull request. We can run these test using travis-ci. Here is a sample run:
https://travis-ci.org/raghavgautam/kafka/builds/170574293

Author: Raghav Kumar Gautam <ra...@apache.org>

Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>

Closes #2064 from raghavgautam/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e035fc03
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e035fc03
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e035fc03

Branch: refs/heads/trunk
Commit: e035fc039598127e88f31739458f705290b1fdba
Parents: 724cddb
Author: Raghav Kumar Gautam <ra...@apache.org>
Authored: Wed Nov 23 20:48:58 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Nov 23 20:48:58 2016 -0800

----------------------------------------------------------------------
 .travis.yml                                     |  48 +++
 gradle/wrapper/gradle-wrapper.jar               | Bin 0 -> 52928 bytes
 gradle/wrapper/gradle-wrapper.properties        |   6 +
 tests/README.md                                 |  37 +++
 tests/cluster_file.json                         |  97 ++++++
 tests/kafkatest/directory_layout/kafka_path.py  |  10 +-
 tests/kafkatest/services/mirror_maker.py        |   4 +-
 tests/kafkatest/tests/client/__init__.py        |  14 -
 .../kafkatest/tests/client/compression_test.py  |  85 ------
 .../client/consumer_rolling_upgrade_test.py     |  82 -----
 tests/kafkatest/tests/client/consumer_test.py   | 297 -------------------
 .../tests/client/message_format_change_test.py  |  90 ------
 tests/kafkatest/tests/client/quota_test.py      | 219 --------------
 tests/kafkatest/tests/client1/__init__.py       |   0
 .../client1/consumer_rolling_upgrade_test.py    |  82 +++++
 .../tests/client1/message_format_change_test.py |  90 ++++++
 tests/kafkatest/tests/client1/quota_test.py     | 219 ++++++++++++++
 tests/kafkatest/tests/client2/__init__.py       |  14 +
 .../kafkatest/tests/client2/compression_test.py |  85 ++++++
 tests/kafkatest/tests/client2/consumer_test.py  | 297 +++++++++++++++++++
 tests/kafkatest/tests/core/__init__.py          |  14 -
 .../core/compatibility_test_new_broker_test.py  |  80 -----
 .../tests/core/consumer_group_command_test.py   | 106 -------
 .../tests/core/get_offset_shell_test.py         |  91 ------
 tests/kafkatest/tests/core/mirror_maker_test.py | 179 -----------
 .../tests/core/reassign_partitions_test.py      | 110 -------
 tests/kafkatest/tests/core/replication_test.py  | 154 ----------
 .../tests/core/security_rolling_upgrade_test.py | 190 ------------
 tests/kafkatest/tests/core/security_test.py     | 106 -------
 .../tests/core/simple_consumer_shell_test.py    |  75 -----
 tests/kafkatest/tests/core/throttling_test.py   | 173 -----------
 tests/kafkatest/tests/core/upgrade_test.py      | 128 --------
 .../core/zookeeper_security_upgrade_test.py     | 115 -------
 tests/kafkatest/tests/core1/__init__.py         |   0
 .../tests/core1/consumer_group_command_test.py  | 106 +++++++
 .../tests/core1/get_offset_shell_test.py        |  91 ++++++
 .../tests/core1/reassign_partitions_test.py     | 110 +++++++
 .../tests/core1/simple_consumer_shell_test.py   |  75 +++++
 tests/kafkatest/tests/core1/throttling_test.py  | 173 +++++++++++
 tests/kafkatest/tests/core2/__init__.py         |  14 +
 .../core2/compatibility_test_new_broker_test.py |  80 +++++
 tests/kafkatest/tests/mirror_maker/__init__.py  |   0
 .../tests/mirror_maker/mirror_maker_test.py     | 179 +++++++++++
 .../kafkatest/tests/produce_consume_validate.py |   3 +-
 tests/kafkatest/tests/replication/__init__.py   |   0
 .../tests/replication/replication_test.py       | 154 ++++++++++
 tests/kafkatest/tests/security1/__init__.py     |   0
 .../kafkatest/tests/security1/security_test.py  | 106 +++++++
 .../zookeeper_security_upgrade_test.py          | 115 +++++++
 tests/kafkatest/tests/security2/__init__.py     |   0
 .../security2/security_rolling_upgrade_test.py  | 190 ++++++++++++
 tests/kafkatest/tests/upgrade/__init__.py       |   0
 tests/kafkatest/tests/upgrade/upgrade_test.py   | 128 ++++++++
 tests/travis/Dockerfile                         |  38 +++
 tests/travis/run_tests.sh                       |  58 ++++
 tests/travis/ssh/authorized_keys                |  15 +
 tests/travis/ssh/config                         |  21 ++
 tests/travis/ssh/id_rsa                         |  27 ++
 tests/travis/ssh/id_rsa.pub                     |   1 +
 59 files changed, 2665 insertions(+), 2316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..fa3ab0b
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,48 @@
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+sudo: required
+dist: trusty
+language: java
+
+# TODO enable failing splits after they have been stablized
+env:
+  - TC_PATHS="tests/kafkatest/tests/client1"
+  - TC_PATHS="tests/kafkatest/tests/client2"
+# - TC_PATHS="tests/kafkatest/tests/connect tests/kafkatest/tests/streams tests/kafkatest/tests/tools"
+# - TC_PATHS="tests/kafkatest/tests/mirror_maker"
+# - TC_PATHS="tests/kafkatest/tests/replication"
+# - TC_PATHS="tests/kafkatest/tests/upgrade"
+  - TC_PATHS="tests/kafkatest/tests/security1"
+# - TC_PATHS="tests/kafkatest/tests/security2"
+# - TC_PATHS="tests/kafkatest/tests/core1"
+  - TC_PATHS="tests/kafkatest/tests/core2"
+
+jdk:
+  - oraclejdk8
+
+before_install:
+
+script:
+  - ./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh
+
+services:
+  - docker
+
+before_cache:
+  - rm -f  $HOME/.gradle/caches/modules-2/modules-2.lock
+  - rm -fr $HOME/.gradle/caches/*/plugin-resolution/
+cache:
+  directories:
+    - "$HOME/.m2/repository"
+    - "$HOME/.gradle/caches/"
+    - "$HOME/.gradle/wrapper/"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/gradle/wrapper/gradle-wrapper.jar
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 0000000..6ffa237
Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 0000000..cde46f5
--- /dev/null
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,6 @@
+#Fri Oct 07 16:09:33 PDT 2016
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-3.0-bin.zip

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/README.md
----------------------------------------------------------------------
diff --git a/tests/README.md b/tests/README.md
index 098922f..9056b99 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -6,6 +6,43 @@ This directory contains Kafka system integration and performance tests.
 (ducktape is a distributed testing framework which provides test runner,
 result reporter and utilities to pull up and tear down services.)
 
+Running tests using docker
+--------------------------
+Docker is used for running kafka system tests on travis-ci. And exactly same setup can be run for development purposes.
+
+* Run all tests
+```
+bash tests/travis/run_tests.sh
+```
+* Run all tests with debug on (warning will produce log of logs)
+```
+_DUCKTAPE_OPTIONS="--debug" bash tests/travis/run_tests.sh | tee debug_logs.txt
+```
+* Run a subset of tests
+```
+TC_PATHS="tests/kafkatest/tests/streams tests/kafkatest/tests/tools" bash tests/travis/run_tests.sh
+```
+
+Examining CI run
+----------------
+* Set BUILD_ID is travis ci's build id. E.g. build id is 169519874 for the following build
+```
+https://travis-ci.org/raghavgautam/kafka/builds/169519874
+```
+
+* Getting number of tests that were actually run
+```
+for id in $(curl -sSL https://api.travis-ci.org/builds/$BUILD_ID | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done | egrep 'SerialTestRunner.*setting up' | wc
+```
+* Getting number of tests that passed
+```
+for id in $(curl -sSL https://api.travis-ci.org/builds/$BUILD_ID | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done | egrep 'SerialTestRunner.*PASS' | wc
+```
+* Getting all the logs produced from a run
+```
+for id in $(curl -sSL https://api.travis-ci.org/builds/169519874 | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done
+```
+
 Local Quickstart
 ----------------
 This quickstart will help you run the Kafka system tests on your local machine. Note this requires bringing up a cluster of virtual machines on your local computer, which is memory intensive; it currently requires around 10G RAM.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/cluster_file.json
----------------------------------------------------------------------
diff --git a/tests/cluster_file.json b/tests/cluster_file.json
new file mode 100644
index 0000000..dbf3f04
--- /dev/null
+++ b/tests/cluster_file.json
@@ -0,0 +1,97 @@
+{
+  "_comment": [
+    "Licensed to the Apache Software Foundation (ASF) under one or more",
+    "contributor license agreements.  See the NOTICE file distributed with",
+    "this work for additional information regarding copyright ownership.",
+    "The ASF licenses this file to You under the Apache License, Version 2.0",
+    "(the \"License\"); you may not use this file except in compliance with",
+    "the License.  You may obtain a copy of the License at",
+    "",
+    "http://www.apache.org/licenses/LICENSE-2.0",
+    "",
+    "Unless required by applicable law or agreed to in writing, software",
+    "distributed under the License is distributed on an \"AS IS\" BASIS,",
+    "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.",
+    "See the License for the specific language governing permissions and",
+    "limitations under the License."
+  ],
+  "nodes": [
+    {
+      "hostname": "knode02.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode02.knw"
+    },
+    {
+      "hostname": "knode03.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode03.knw"
+    },
+    {
+      "hostname": "knode04.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode04.knw"
+    },
+    {
+      "hostname": "knode05.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode05.knw"
+    },
+    {
+      "hostname": "knode06.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode06.knw"
+    },
+    {
+      "hostname": "knode07.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode07.knw"
+    },
+    {
+      "hostname": "knode08.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode08.knw"
+    },
+    {
+      "hostname": "knode09.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode09.knw"
+    },
+    {
+      "hostname": "knode10.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode10.knw"
+    },
+    {
+      "hostname": "knode11.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode11.knw"
+    },
+    {
+      "hostname": "knode12.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode12.knw"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/directory_layout/kafka_path.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/directory_layout/kafka_path.py b/tests/kafkatest/directory_layout/kafka_path.py
index 0e60aff..9688174 100644
--- a/tests/kafkatest/directory_layout/kafka_path.py
+++ b/tests/kafkatest/directory_layout/kafka_path.py
@@ -44,11 +44,11 @@ TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME = "tools-dependant-libs"
 
 JARS = {
     "trunk": {
-        CORE_JAR_NAME: "core/build/*/*.jar",
-        CORE_LIBS_JAR_NAME: "core/build/libs/*.jar",
-        CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "core/build/dependant-testlibs/*.jar",
-        TOOLS_JAR_NAME: "tools/build/libs/kafka-tools*.jar",
-        TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "tools/build/dependant-libs*/*.jar"
+        CORE_JAR_NAME: "libs/*.jar",
+        CORE_LIBS_JAR_NAME: "libs/*.jar",
+        CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/*.jar",
+        TOOLS_JAR_NAME: "libs/*.jar",
+        TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/*.jar"
     }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 14af4cf..626a0ff 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -180,13 +180,13 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
         cmd = self.start_cmd(node)
         self.logger.debug("Mirror maker command: %s", cmd)
         node.account.ssh(cmd, allow_fail=False)
-        wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5,
+        wait_until(lambda: self.alive(node), timeout_sec=30, backoff_sec=.5,
                    err_msg="Mirror maker took to long to start.")
         self.logger.debug("Mirror maker is alive")
 
     def stop_node(self, node, clean_shutdown=True):
         node.account.kill_process("java", allow_fail=True, clean_shutdown=clean_shutdown)
-        wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.5,
+        wait_until(lambda: not self.alive(node), timeout_sec=30, backoff_sec=.5,
                    err_msg="Mirror maker took to long to stop.")
 
     def clean_node(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/__init__.py b/tests/kafkatest/tests/client/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/tests/kafkatest/tests/client/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
deleted file mode 100644
index 0de53ae..0000000
--- a/tests/kafkatest/tests/client/compression_test.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int_with_prefix
-
-class CompressionTest(ProduceConsumeValidateTest):
-    """
-    These tests validate produce / consume for compressed topics.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(CompressionTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 10,
-                                                                    "replication-factor": 1}})
-        self.num_partitions = 10
-        self.timeout_sec = 60
-        self.producer_throughput = 1000
-        self.num_producers = 4
-        self.messages_per_producer = 1000
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        # Override this since we're adding services outside of the constructor
-        return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
-    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
-    def test_compressed_topic(self, compression_types, new_consumer):
-        """Test produce => consume => validate for compressed topics
-        Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
-
-        compression_types parameter gives a list of compression types (or no compression if
-        "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
-        compression type from the list based on producer's index in the group.
-
-            - Produce messages in the background
-            - Consume messages in the background
-            - Stop producing, and finish consuming
-            - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = "PLAINTEXT"
-        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int_with_prefix,
-                                           compression_types=compression_types)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
-                                        new_consumer=new_consumer, consumer_timeout_ms=60000,
-                                        message_validator=is_int_with_prefix)
-        self.kafka.start()
-
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
deleted file mode 100644
index 3cd3c7c..0000000
--- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 4
-    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
-    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
-
-    def __init__(self, test_context):
-        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
-                                                         num_zk=1, num_brokers=1, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
-        })
-
-    def _verify_range_assignment(self, consumer):
-        # range assignment should give us two partition sets: (0, 1) and (2, 3)
-        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
-            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
-
-    def _verify_roundrobin_assignment(self, consumer):
-        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
-            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
-
-    def rolling_update_test(self):
-        """
-        Verify rolling updates of partition assignment strategies works correctly. In this
-        test, we use a rolling restart to change the group's assignment strategy from "range" 
-        to "roundrobin." We verify after every restart that all members are still in the group
-        and that the correct assignment strategy was used.
-        """
-
-        # initialize the consumer using range assignment
-        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
-
-        consumer.start()
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-
-        # change consumer configuration to prefer round-robin assignment, but still support range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
-
-        # restart one of the nodes and verify that we are still using range assignment
-        consumer.stop_node(consumer.nodes[0])
-        consumer.start_node(consumer.nodes[0])
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-        
-        # now restart the other node and verify that we have switched to round-robin
-        consumer.stop_node(consumer.nodes[1])
-        consumer.start_node(consumer.nodes[1])
-        self.await_all_members(consumer)
-        self._verify_roundrobin_assignment(consumer)
-
-        # if we want, we can now drop support for range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN
-        for node in consumer.nodes:
-            consumer.stop_node(node)
-            consumer.start_node(node)
-            self.await_all_members(consumer)
-            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
deleted file mode 100644
index 534f65c..0000000
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ /dev/null
@@ -1,297 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import matrix
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-import signal
-
-class OffsetValidationTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 1
-
-    def __init__(self, test_context):
-        super(OffsetValidationTest, self).__init__(test_context, num_consumers=3, num_producers=1,
-                                                     num_zk=1, num_brokers=2, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
-        })
-
-    def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
-        for _ in range(num_bounces):
-            for node in consumer.nodes:
-                consumer.stop_node(node, clean_shutdown)
-
-                wait_until(lambda: len(consumer.dead_nodes()) == 1,
-                           timeout_sec=self.session_timeout_sec+5,
-                           err_msg="Timed out waiting for the consumer to shutdown")
-
-                consumer.start_node(node)
-
-                self.await_all_members(consumer)
-                self.await_consumed_messages(consumer)
-
-    def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
-        for _ in range(num_bounces):
-            for node in consumer.nodes:
-                consumer.stop_node(node, clean_shutdown)
-
-            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
-                       err_msg="Timed out waiting for the consumers to shutdown")
-            
-            for node in consumer.nodes:
-                consumer.start_node(node)
-
-            self.await_all_members(consumer)
-            self.await_consumed_messages(consumer)
-
-    def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
-        for _ in range(num_bounces):
-            for node in self.kafka.nodes:
-                self.kafka.restart_node(node, clean_shutdown=True)
-                self.await_all_members(consumer)
-                self.await_consumed_messages(consumer)
-
-    def setup_consumer(self, topic, **kwargs):
-        # collect verifiable consumer events since this makes debugging much easier
-        consumer = super(OffsetValidationTest, self).setup_consumer(topic, **kwargs)
-        self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
-        return consumer
-
-    def test_broker_rolling_bounce(self):
-        """
-        Verify correct consumer behavior when the brokers are consecutively restarted.
-
-        Setup: single Kafka cluster with one producer writing messages to a single topic with one
-        partition, an a set of consumers in the same group reading from the same topic.
-
-        - Start a producer which continues producing new messages throughout the test.
-        - Start up the consumers and wait until they've joined the group.
-        - In a loop, restart each broker consecutively, waiting for the group to stabilize between
-          each broker restart.
-        - Verify delivery semantics according to the failure type and that the broker bounces
-          did not cause unexpected group rebalances.
-        """
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        producer = self.setup_producer(self.TOPIC)
-        consumer = self.setup_consumer(self.TOPIC)
-
-        producer.start()
-        self.await_produced_messages(producer)
-
-        consumer.start()
-        self.await_all_members(consumer)
-
-        num_rebalances = consumer.num_rebalances()
-        # TODO: make this test work with hard shutdowns, which probably requires
-        #       pausing before the node is restarted to ensure that any ephemeral
-        #       nodes have time to expire
-        self.rolling_bounce_brokers(consumer, clean_shutdown=True)
-        
-        unexpected_rebalances = consumer.num_rebalances() - num_rebalances
-        assert unexpected_rebalances == 0, \
-            "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
-
-        consumer.stop_all()
-
-        assert consumer.current_position(partition) == consumer.total_consumed(), \
-            "Total consumed records did not match consumed position"
-
-    @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
-    def test_consumer_bounce(self, clean_shutdown, bounce_mode):
-        """
-        Verify correct consumer behavior when the consumers in the group are consecutively restarted.
-
-        Setup: single Kafka cluster with one producer and a set of consumers in one group.
-
-        - Start a producer which continues producing new messages throughout the test.
-        - Start up the consumers and wait until they've joined the group.
-        - In a loop, restart each consumer, waiting for each one to rejoin the group before
-          restarting the rest.
-        - Verify delivery semantics according to the failure type.
-        """
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        producer = self.setup_producer(self.TOPIC)
-        consumer = self.setup_consumer(self.TOPIC)
-
-        producer.start()
-        self.await_produced_messages(producer)
-
-        consumer.start()
-        self.await_all_members(consumer)
-
-        if bounce_mode == "all":
-            self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
-        else:
-            self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
-                
-        consumer.stop_all()
-        if clean_shutdown:
-            # if the total records consumed matches the current position, we haven't seen any duplicates
-            # this can only be guaranteed with a clean shutdown
-            assert consumer.current_position(partition) == consumer.total_consumed(), \
-                "Total consumed records did not match consumed position"
-        else:
-            # we may have duplicates in a hard failure
-            assert consumer.current_position(partition) <= consumer.total_consumed(), \
-                "Current position greater than the total number of consumed records"
-
-    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
-    def test_consumer_failure(self, clean_shutdown, enable_autocommit):
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
-        producer = self.setup_producer(self.TOPIC)
-
-        consumer.start()
-        self.await_all_members(consumer)
-
-        partition_owner = consumer.owner(partition)
-        assert partition_owner is not None
-
-        # startup the producer and ensure that some records have been written
-        producer.start()
-        self.await_produced_messages(producer)
-
-        # stop the partition owner and await its shutdown
-        consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
-        wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
-                   timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for consumer to close")
-
-        # ensure that the remaining consumer does some work after rebalancing
-        self.await_consumed_messages(consumer, min_messages=1000)
-
-        consumer.stop_all()
-
-        if clean_shutdown:
-            # if the total records consumed matches the current position, we haven't seen any duplicates
-            # this can only be guaranteed with a clean shutdown
-            assert consumer.current_position(partition) == consumer.total_consumed(), \
-                "Total consumed records did not match consumed position"
-        else:
-            # we may have duplicates in a hard failure
-            assert consumer.current_position(partition) <= consumer.total_consumed(), \
-                "Current position greater than the total number of consumed records"
-
-        # if autocommit is not turned on, we can also verify the last committed offset
-        if not enable_autocommit:
-            assert consumer.last_commit(partition) == consumer.current_position(partition), \
-                "Last committed offset did not match last consumed position"
-
-
-    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
-    def test_broker_failure(self, clean_shutdown, enable_autocommit):
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
-        producer = self.setup_producer(self.TOPIC)
-
-        producer.start()
-        consumer.start()
-        self.await_all_members(consumer)
-
-        num_rebalances = consumer.num_rebalances()
-
-        # shutdown one of the brokers
-        # TODO: we need a way to target the coordinator instead of picking arbitrarily
-        self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
-
-        # ensure that the consumers do some work after the broker failure
-        self.await_consumed_messages(consumer, min_messages=1000)
-
-        # verify that there were no rebalances on failover
-        assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
-
-        consumer.stop_all()
-
-        # if the total records consumed matches the current position, we haven't seen any duplicates
-        assert consumer.current_position(partition) == consumer.total_consumed(), \
-            "Total consumed records did not match consumed position"
-
-        # if autocommit is not turned on, we can also verify the last committed offset
-        if not enable_autocommit:
-            assert consumer.last_commit(partition) == consumer.current_position(partition), \
-                "Last committed offset did not match last consumed position"
-
-    def test_group_consumption(self):
-        """
-        Verifies correct group rebalance behavior as consumers are started and stopped. 
-        In particular, this test verifies that the partition is readable after every
-        expected rebalance.
-
-        Setup: single Kafka cluster with a group of consumers reading from one topic
-        with one partition while the verifiable producer writes to it.
-
-        - Start the consumers one by one, verifying consumption after each rebalance
-        - Shutdown the consumers one by one, verifying consumption after each rebalance
-        """
-        consumer = self.setup_consumer(self.TOPIC)
-        producer = self.setup_producer(self.TOPIC)
-
-        partition = TopicPartition(self.TOPIC, 0)
-
-        producer.start()
-
-        for num_started, node in enumerate(consumer.nodes, 1):
-            consumer.start_node(node)
-            self.await_members(consumer, num_started)
-            self.await_consumed_messages(consumer)
-
-        for num_stopped, node in enumerate(consumer.nodes, 1):
-            consumer.stop_node(node)
-
-            if num_stopped < self.num_consumers:
-                self.await_members(consumer, self.num_consumers - num_stopped)
-                self.await_consumed_messages(consumer)
-
-        assert consumer.current_position(partition) == consumer.total_consumed(), \
-            "Total consumed records did not match consumed position"
-
-        assert consumer.last_commit(partition) == consumer.current_position(partition), \
-            "Last committed offset did not match last consumed position"
-
-
-class AssignmentValidationTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 6
-
-    def __init__(self, test_context):
-        super(AssignmentValidationTest, self).__init__(test_context, num_consumers=3, num_producers=0,
-                                                num_zk=1, num_brokers=2, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
-        })
-
-    @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
-                                 "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
-    def test_valid_assignment(self, assignment_strategy):
-        """
-        Verify assignment strategy correctness: each partition is assigned to exactly
-        one consumer instance.
-
-        Setup: single Kafka cluster with a set of consumers in the same group.
-
-        - Start the consumers one by one
-        - Validate assignment after every expected rebalance
-        """
-        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy)
-        for num_started, node in enumerate(consumer.nodes, 1):
-            consumer.start_node(node)
-            self.await_members(consumer, num_started)
-            assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
-            

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
deleted file mode 100644
index a57c04b..0000000
--- a/tests/kafkatest/tests/client/message_format_change_test.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
-
-
-class MessageFormatChangeTest(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.messages_per_producer = 100
-
-    def produce_and_consume(self, producer_version, consumer_version, group):
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic,
-                                           throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           version=KafkaVersion(producer_version))
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, new_consumer=False, consumer_timeout_ms=30000,
-                                        message_validator=is_int, version=KafkaVersion(consumer_version))
-        self.consumer.group_id = group
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))
-        
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
-    def test_compatibility(self, producer_version, consumer_version):
-        """ This tests performs the following checks:
-        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
-        that produce to and consume from a 0.10.x cluster
-        1. initially the topic is using message format 0.9.0
-        2. change the message format version for topic to 0.10.0 on the fly.
-        3. change the message format version for topic back to 0.9.0 on the fly.
-        - The producers and consumers should not have any issue.
-        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
-        """
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}})
-       
-        self.kafka.start()
-        self.logger.info("First format change to 0.9.0")
-        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
-        self.produce_and_consume(producer_version, consumer_version, "group1")
-
-        self.logger.info("Second format change to 0.10.0")
-        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
-        self.produce_and_consume(producer_version, consumer_version, "group2")
-
-        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
-            self.logger.info("Third format change back to 0.9.0")
-            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
-            self.produce_and_consume(producer_version, consumer_version, "group3")
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
deleted file mode 100644
index 1d31569..0000000
--- a/tests/kafkatest/tests/client/quota_test.py
+++ /dev/null
@@ -1,219 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import matrix, parametrize
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.console_consumer import ConsoleConsumer
-
-class QuotaConfig(object):
-    CLIENT_ID = 'client-id'
-    USER = 'user'
-    USER_CLIENT = '(user, client-id)'
-
-    LARGE_QUOTA = 1000 * 1000 * 1000
-    USER_PRINCIPAL = 'CN=systemtest'
-
-    def __init__(self, quota_type, override_quota, kafka):
-        if quota_type == QuotaConfig.CLIENT_ID:
-            if override_quota:
-                self.client_id = 'overridden_id'
-                self.producer_quota = 3750000
-                self.consumer_quota = 3000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', self.client_id])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-            else:
-                self.client_id = 'default_id'
-                self.producer_quota = 2500000
-                self.consumer_quota = 2000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id'])
-        elif quota_type == QuotaConfig.USER:
-            if override_quota:
-                self.client_id = 'some_id'
-                self.producer_quota = 3750000
-                self.consumer_quota = 3000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
-            else:
-                self.client_id = 'some_id'
-                self.producer_quota = 2500000
-                self.consumer_quota = 2000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-        elif quota_type == QuotaConfig.USER_CLIENT:
-            if override_quota:
-                self.client_id = 'overridden_id'
-                self.producer_quota = 3750000
-                self.consumer_quota = 3000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
-            else:
-                self.client_id = 'default_id'
-                self.producer_quota = 2500000
-                self.consumer_quota = 2000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None, 'clients', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-
-    def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
-        node = kafka.nodes[0]
-        cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
-              (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate)
-        cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
-        if len(entity_args) > 2:
-            cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
-        node.account.ssh(cmd)
-
-    def entity_name_opt(self, name):
-        return " --entity-default" if name is None else " --entity-name " + name
-
-class QuotaTest(Test):
-    """
-    These tests verify that quota provides expected functionality -- they run
-    producer, broker, and consumer with different clientId and quota configuration and
-    check that the observed throughput is close to the value we expect.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(QuotaTest, self).__init__(test_context=test_context)
-
-        self.topic = 'test_topic'
-        self.logger.info('use topic ' + self.topic)
-
-        self.maximum_client_deviation_percentage = 100.0
-        self.maximum_broker_deviation_percentage = 5.0
-        self.num_records = 50000
-        self.record_size = 3000
-
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
-                                  security_protocol='SSL', authorizer_class_name='',
-                                  interbroker_security_protocol='SSL',
-                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
-                                  jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
-                                                    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
-                                  jmx_attributes=['OneMinuteRate'])
-        self.num_producers = 1
-        self.num_consumers = 2
-
-    def setUp(self):
-        self.zk.start()
-        self.kafka.start()
-
-    def min_cluster_size(self):
-        """Override this since we're adding services outside of the constructor"""
-        return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
-    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
-    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
-        self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
-        producer_client_id = self.quota_config.client_id
-        consumer_client_id = self.quota_config.client_id
-
-        # Produce all messages
-        producer = ProducerPerformanceService(
-            self.test_context, producer_num, self.kafka,
-            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
-            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
-
-        producer.run()
-
-        # Consume all messages
-        consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
-            consumer_timeout_ms=60000, client_id=consumer_client_id,
-            jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
-            jmx_attributes=['bytes-consumed-rate'])
-        consumer.run()
-
-        for idx, messages in consumer.messages_consumed.iteritems():
-            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
-
-        success, msg = self.validate(self.kafka, producer, consumer)
-        assert success, msg
-
-    def validate(self, broker, producer, consumer):
-        """
-        For each client_id we validate that:
-        1) number of consumed messages equals number of produced messages
-        2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
-        3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
-        4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
-        5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
-        """
-        success = True
-        msg = ''
-
-        self.kafka.read_jmx_output_all_nodes()
-
-        # validate that number of consumed messages equals number of produced messages
-        produced_num = sum([value['records'] for value in producer.results])
-        consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
-        self.logger.info('producer produced %d messages' % produced_num)
-        self.logger.info('consumer consumed %d messages' % consumed_num)
-        if produced_num != consumed_num:
-            success = False
-            msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
-
-        # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
-        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
-        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
-        producer_quota_bps = self.quota_config.producer_quota
-        self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
-        if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
-                   (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
-
-        # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
-        broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
-        broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
-        self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
-                         (broker_maximum_byte_in_bps, producer_quota_bps))
-        if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
-                   (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
-
-        # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
-        consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
-        consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
-        consumer_quota_bps = self.quota_config.consumer_quota
-        self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
-        if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
-                   (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
-
-        # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
-        broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
-        broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
-        self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
-                         (broker_maximum_byte_out_bps, consumer_quota_bps))
-        if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
-                   (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
-
-        return success, msg
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client1/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/__init__.py b/tests/kafkatest/tests/client1/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
new file mode 100644
index 0000000..3cd3c7c
--- /dev/null
+++ b/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 4
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+    def __init__(self, test_context):
+        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
+                                                         num_zk=1, num_brokers=1, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+        })
+
+    def _verify_range_assignment(self, consumer):
+        # range assignment should give us two partition sets: (0, 1) and (2, 3)
+        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+    def _verify_roundrobin_assignment(self, consumer):
+        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+    def rolling_update_test(self):
+        """
+        Verify rolling updates of partition assignment strategies works correctly. In this
+        test, we use a rolling restart to change the group's assignment strategy from "range" 
+        to "roundrobin." We verify after every restart that all members are still in the group
+        and that the correct assignment strategy was used.
+        """
+
+        # initialize the consumer using range assignment
+        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
+
+        consumer.start()
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+
+        # change consumer configuration to prefer round-robin assignment, but still support range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+        # restart one of the nodes and verify that we are still using range assignment
+        consumer.stop_node(consumer.nodes[0])
+        consumer.start_node(consumer.nodes[0])
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+        
+        # now restart the other node and verify that we have switched to round-robin
+        consumer.stop_node(consumer.nodes[1])
+        consumer.start_node(consumer.nodes[1])
+        self.await_all_members(consumer)
+        self._verify_roundrobin_assignment(consumer)
+
+        # if we want, we can now drop support for range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN
+        for node in consumer.nodes:
+            consumer.stop_node(node)
+            consumer.start_node(node)
+            self.await_all_members(consumer)
+            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client1/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/message_format_change_test.py b/tests/kafkatest/tests/client1/message_format_change_test.py
new file mode 100644
index 0000000..a57c04b
--- /dev/null
+++ b/tests/kafkatest/tests/client1/message_format_change_test.py
@@ -0,0 +1,90 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
+
+
+class MessageFormatChangeTest(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.messages_per_producer = 100
+
+    def produce_and_consume(self, producer_version, consumer_version, group):
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic,
+                                           throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           version=KafkaVersion(producer_version))
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, new_consumer=False, consumer_timeout_ms=30000,
+                                        message_validator=is_int, version=KafkaVersion(consumer_version))
+        self.consumer.group_id = group
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+        
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
+    def test_compatibility(self, producer_version, consumer_version):
+        """ This tests performs the following checks:
+        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
+        that produce to and consume from a 0.10.x cluster
+        1. initially the topic is using message format 0.9.0
+        2. change the message format version for topic to 0.10.0 on the fly.
+        3. change the message format version for topic back to 0.9.0 on the fly.
+        - The producers and consumers should not have any issue.
+        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
+        """
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    'configs': {"min.insync.replicas": 2}}})
+       
+        self.kafka.start()
+        self.logger.info("First format change to 0.9.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+        self.produce_and_consume(producer_version, consumer_version, "group1")
+
+        self.logger.info("Second format change to 0.10.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
+        self.produce_and_consume(producer_version, consumer_version, "group2")
+
+        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
+            self.logger.info("Third format change back to 0.9.0")
+            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+            self.produce_and_consume(producer_version, consumer_version, "group3")
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client1/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/quota_test.py b/tests/kafkatest/tests/client1/quota_test.py
new file mode 100644
index 0000000..1d31569
--- /dev/null
+++ b/tests/kafkatest/tests/client1/quota_test.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import matrix, parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.performance import ProducerPerformanceService
+from kafkatest.services.console_consumer import ConsoleConsumer
+
+class QuotaConfig(object):
+    CLIENT_ID = 'client-id'
+    USER = 'user'
+    USER_CLIENT = '(user, client-id)'
+
+    LARGE_QUOTA = 1000 * 1000 * 1000
+    USER_PRINCIPAL = 'CN=systemtest'
+
+    def __init__(self, quota_type, override_quota, kafka):
+        if quota_type == QuotaConfig.CLIENT_ID:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id'])
+        elif quota_type == QuotaConfig.USER:
+            if override_quota:
+                self.client_id = 'some_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+            else:
+                self.client_id = 'some_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+        elif quota_type == QuotaConfig.USER_CLIENT:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+
+    def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
+        node = kafka.nodes[0]
+        cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
+              (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate)
+        cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
+        if len(entity_args) > 2:
+            cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
+        node.account.ssh(cmd)
+
+    def entity_name_opt(self, name):
+        return " --entity-default" if name is None else " --entity-name " + name
+
+class QuotaTest(Test):
+    """
+    These tests verify that quota provides expected functionality -- they run
+    producer, broker, and consumer with different clientId and quota configuration and
+    check that the observed throughput is close to the value we expect.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(QuotaTest, self).__init__(test_context=test_context)
+
+        self.topic = 'test_topic'
+        self.logger.info('use topic ' + self.topic)
+
+        self.maximum_client_deviation_percentage = 100.0
+        self.maximum_broker_deviation_percentage = 5.0
+        self.num_records = 50000
+        self.record_size = 3000
+
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+                                  security_protocol='SSL', authorizer_class_name='',
+                                  interbroker_security_protocol='SSL',
+                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
+                                  jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
+                                                    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
+                                  jmx_attributes=['OneMinuteRate'])
+        self.num_producers = 1
+        self.num_consumers = 2
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the constructor"""
+        return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
+    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
+        self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
+        producer_client_id = self.quota_config.client_id
+        consumer_client_id = self.quota_config.client_id
+
+        # Produce all messages
+        producer = ProducerPerformanceService(
+            self.test_context, producer_num, self.kafka,
+            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
+            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
+
+        producer.run()
+
+        # Consume all messages
+        consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
+            consumer_timeout_ms=60000, client_id=consumer_client_id,
+            jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
+            jmx_attributes=['bytes-consumed-rate'])
+        consumer.run()
+
+        for idx, messages in consumer.messages_consumed.iteritems():
+            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
+
+        success, msg = self.validate(self.kafka, producer, consumer)
+        assert success, msg
+
+    def validate(self, broker, producer, consumer):
+        """
+        For each client_id we validate that:
+        1) number of consumed messages equals number of produced messages
+        2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        """
+        success = True
+        msg = ''
+
+        self.kafka.read_jmx_output_all_nodes()
+
+        # validate that number of consumed messages equals number of produced messages
+        produced_num = sum([value['records'] for value in producer.results])
+        consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
+        self.logger.info('producer produced %d messages' % produced_num)
+        self.logger.info('consumer consumed %d messages' % consumed_num)
+        if produced_num != consumed_num:
+            success = False
+            msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
+
+        # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
+        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
+        producer_quota_bps = self.quota_config.producer_quota
+        self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
+        if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+                   (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
+        broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
+        self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
+                         (broker_maximum_byte_in_bps, producer_quota_bps))
+        if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+                   (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
+        consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
+        consumer_quota_bps = self.quota_config.consumer_quota
+        self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
+        if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+                   (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
+        broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
+        self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
+                         (broker_maximum_byte_out_bps, consumer_quota_bps))
+        if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+                   (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        return success, msg
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client2/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client2/__init__.py b/tests/kafkatest/tests/client2/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/tests/client2/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client2/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client2/compression_test.py b/tests/kafkatest/tests/client2/compression_test.py
new file mode 100644
index 0000000..0de53ae
--- /dev/null
+++ b/tests/kafkatest/tests/client2/compression_test.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int_with_prefix
+
+class CompressionTest(ProduceConsumeValidateTest):
+    """
+    These tests validate produce / consume for compressed topics.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(CompressionTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 10,
+                                                                    "replication-factor": 1}})
+        self.num_partitions = 10
+        self.timeout_sec = 60
+        self.producer_throughput = 1000
+        self.num_producers = 4
+        self.messages_per_producer = 1000
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
+    def test_compressed_topic(self, compression_types, new_consumer):
+        """Test produce => consume => validate for compressed topics
+        Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
+
+        compression_types parameter gives a list of compression types (or no compression if
+        "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
+        compression type from the list based on producer's index in the group.
+
+            - Produce messages in the background
+            - Consume messages in the background
+            - Stop producing, and finish consuming
+            - Validate that every acked message was consumed
+        """
+
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int_with_prefix,
+                                           compression_types=compression_types)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
+                                        new_consumer=new_consumer, consumer_timeout_ms=60000,
+                                        message_validator=is_int_with_prefix)
+        self.kafka.start()
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+