You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/11/29 17:11:58 UTC
[4/4] kafka git commit: Revert "KAFKA-4345;
Run decktape test for each pull request"
Revert "KAFKA-4345; Run decktape test for each pull request"
This reverts commit e035fc039598127e88f31739458f705290b1fdba for the
following reasons:
1. License files are missing causing local builds to fail during the
rat task (rat is not being run in Jenkins for some reason, filed
KAFKA-4459 for that)
2. It renames a number of system test files when there's a better
way to achieve the goal of running a subset of system tests to stay
under the Travis limit.
3. It adds the gradle wrapper binary even though this was removed
intentionally a while back.
A new PR will be submitted for KAFKA-4345 without the undesired
changes.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2187 from ijuma/kafka-4345-revert
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5d28149
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5d28149
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5d28149
Branch: refs/heads/trunk
Commit: a5d28149fb67ab68d9de7d8de63c0de93b80a82c
Parents: 3e3b7a0
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Nov 29 09:11:21 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Nov 29 09:11:21 2016 -0800
----------------------------------------------------------------------
.travis.yml | 48 ---
gradle/wrapper/gradle-wrapper.jar | Bin 52928 -> 0 bytes
gradle/wrapper/gradle-wrapper.properties | 6 -
tests/README.md | 37 ---
tests/cluster_file.json | 97 ------
tests/kafkatest/directory_layout/kafka_path.py | 10 +-
tests/kafkatest/services/mirror_maker.py | 4 +-
tests/kafkatest/tests/client/__init__.py | 14 +
.../kafkatest/tests/client/compression_test.py | 85 ++++++
.../client/consumer_rolling_upgrade_test.py | 82 +++++
tests/kafkatest/tests/client/consumer_test.py | 297 +++++++++++++++++++
.../tests/client/message_format_change_test.py | 90 ++++++
tests/kafkatest/tests/client/quota_test.py | 219 ++++++++++++++
tests/kafkatest/tests/client1/__init__.py | 0
.../client1/consumer_rolling_upgrade_test.py | 82 -----
.../tests/client1/message_format_change_test.py | 90 ------
tests/kafkatest/tests/client1/quota_test.py | 219 --------------
tests/kafkatest/tests/client2/__init__.py | 14 -
.../kafkatest/tests/client2/compression_test.py | 85 ------
tests/kafkatest/tests/client2/consumer_test.py | 297 -------------------
tests/kafkatest/tests/core/__init__.py | 14 +
.../core/compatibility_test_new_broker_test.py | 80 +++++
.../tests/core/consumer_group_command_test.py | 106 +++++++
.../tests/core/get_offset_shell_test.py | 91 ++++++
tests/kafkatest/tests/core/mirror_maker_test.py | 179 +++++++++++
.../tests/core/reassign_partitions_test.py | 110 +++++++
tests/kafkatest/tests/core/replication_test.py | 154 ++++++++++
.../tests/core/security_rolling_upgrade_test.py | 190 ++++++++++++
tests/kafkatest/tests/core/security_test.py | 106 +++++++
.../tests/core/simple_consumer_shell_test.py | 75 +++++
tests/kafkatest/tests/core/throttling_test.py | 173 +++++++++++
tests/kafkatest/tests/core/upgrade_test.py | 128 ++++++++
.../core/zookeeper_security_upgrade_test.py | 115 +++++++
tests/kafkatest/tests/core1/__init__.py | 0
.../tests/core1/consumer_group_command_test.py | 106 -------
.../tests/core1/get_offset_shell_test.py | 91 ------
.../tests/core1/reassign_partitions_test.py | 110 -------
.../tests/core1/simple_consumer_shell_test.py | 75 -----
tests/kafkatest/tests/core1/throttling_test.py | 173 -----------
tests/kafkatest/tests/core2/__init__.py | 14 -
.../core2/compatibility_test_new_broker_test.py | 80 -----
tests/kafkatest/tests/mirror_maker/__init__.py | 0
.../tests/mirror_maker/mirror_maker_test.py | 179 -----------
.../kafkatest/tests/produce_consume_validate.py | 3 +-
tests/kafkatest/tests/replication/__init__.py | 0
.../tests/replication/replication_test.py | 154 ----------
tests/kafkatest/tests/security1/__init__.py | 0
.../kafkatest/tests/security1/security_test.py | 106 -------
.../zookeeper_security_upgrade_test.py | 115 -------
tests/kafkatest/tests/security2/__init__.py | 0
.../security2/security_rolling_upgrade_test.py | 190 ------------
tests/kafkatest/tests/upgrade/__init__.py | 0
tests/kafkatest/tests/upgrade/upgrade_test.py | 128 --------
tests/travis/Dockerfile | 38 ---
tests/travis/run_tests.sh | 58 ----
tests/travis/ssh/authorized_keys | 15 -
tests/travis/ssh/config | 21 --
tests/travis/ssh/id_rsa | 27 --
tests/travis/ssh/id_rsa.pub | 1 -
59 files changed, 2316 insertions(+), 2665 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index fa3ab0b..0000000
--- a/.travis.yml
+++ /dev/null
@@ -1,48 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-sudo: required
-dist: trusty
-language: java
-
-# TODO enable failing splits after they have been stablized
-env:
- - TC_PATHS="tests/kafkatest/tests/client1"
- - TC_PATHS="tests/kafkatest/tests/client2"
-# - TC_PATHS="tests/kafkatest/tests/connect tests/kafkatest/tests/streams tests/kafkatest/tests/tools"
-# - TC_PATHS="tests/kafkatest/tests/mirror_maker"
-# - TC_PATHS="tests/kafkatest/tests/replication"
-# - TC_PATHS="tests/kafkatest/tests/upgrade"
- - TC_PATHS="tests/kafkatest/tests/security1"
-# - TC_PATHS="tests/kafkatest/tests/security2"
-# - TC_PATHS="tests/kafkatest/tests/core1"
- - TC_PATHS="tests/kafkatest/tests/core2"
-
-jdk:
- - oraclejdk8
-
-before_install:
-
-script:
- - ./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh
-
-services:
- - docker
-
-before_cache:
- - rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
- - rm -fr $HOME/.gradle/caches/*/plugin-resolution/
-cache:
- directories:
- - "$HOME/.m2/repository"
- - "$HOME/.gradle/caches/"
- - "$HOME/.gradle/wrapper/"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/gradle/wrapper/gradle-wrapper.jar
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
deleted file mode 100644
index 6ffa237..0000000
Binary files a/gradle/wrapper/gradle-wrapper.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
deleted file mode 100644
index cde46f5..0000000
--- a/gradle/wrapper/gradle-wrapper.properties
+++ /dev/null
@@ -1,6 +0,0 @@
-#Fri Oct 07 16:09:33 PDT 2016
-distributionBase=GRADLE_USER_HOME
-distributionPath=wrapper/dists
-zipStoreBase=GRADLE_USER_HOME
-zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-3.0-bin.zip
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/README.md
----------------------------------------------------------------------
diff --git a/tests/README.md b/tests/README.md
index 9056b99..098922f 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -6,43 +6,6 @@ This directory contains Kafka system integration and performance tests.
(ducktape is a distributed testing framework which provides test runner,
result reporter and utilities to pull up and tear down services.)
-Running tests using docker
---------------------------
-Docker is used for running kafka system tests on travis-ci. And exactly same setup can be run for development purposes.
-
-* Run all tests
-```
-bash tests/travis/run_tests.sh
-```
-* Run all tests with debug on (warning will produce log of logs)
-```
-_DUCKTAPE_OPTIONS="--debug" bash tests/travis/run_tests.sh | tee debug_logs.txt
-```
-* Run a subset of tests
-```
-TC_PATHS="tests/kafkatest/tests/streams tests/kafkatest/tests/tools" bash tests/travis/run_tests.sh
-```
-
-Examining CI run
-----------------
-* Set BUILD_ID is travis ci's build id. E.g. build id is 169519874 for the following build
-```
-https://travis-ci.org/raghavgautam/kafka/builds/169519874
-```
-
-* Getting number of tests that were actually run
-```
-for id in $(curl -sSL https://api.travis-ci.org/builds/$BUILD_ID | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done | egrep 'SerialTestRunner.*setting up' | wc
-```
-* Getting number of tests that passed
-```
-for id in $(curl -sSL https://api.travis-ci.org/builds/$BUILD_ID | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done | egrep 'SerialTestRunner.*PASS' | wc
-```
-* Getting all the logs produced from a run
-```
-for id in $(curl -sSL https://api.travis-ci.org/builds/169519874 | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done
-```
-
Local Quickstart
----------------
This quickstart will help you run the Kafka system tests on your local machine. Note this requires bringing up a cluster of virtual machines on your local computer, which is memory intensive; it currently requires around 10G RAM.
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/cluster_file.json
----------------------------------------------------------------------
diff --git a/tests/cluster_file.json b/tests/cluster_file.json
deleted file mode 100644
index dbf3f04..0000000
--- a/tests/cluster_file.json
+++ /dev/null
@@ -1,97 +0,0 @@
-{
- "_comment": [
- "Licensed to the Apache Software Foundation (ASF) under one or more",
- "contributor license agreements. See the NOTICE file distributed with",
- "this work for additional information regarding copyright ownership.",
- "The ASF licenses this file to You under the Apache License, Version 2.0",
- "(the \"License\"); you may not use this file except in compliance with",
- "the License. You may obtain a copy of the License at",
- "",
- "http://www.apache.org/licenses/LICENSE-2.0",
- "",
- "Unless required by applicable law or agreed to in writing, software",
- "distributed under the License is distributed on an \"AS IS\" BASIS,",
- "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.",
- "See the License for the specific language governing permissions and",
- "limitations under the License."
- ],
- "nodes": [
- {
- "hostname": "knode02.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode02.knw"
- },
- {
- "hostname": "knode03.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode03.knw"
- },
- {
- "hostname": "knode04.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode04.knw"
- },
- {
- "hostname": "knode05.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode05.knw"
- },
- {
- "hostname": "knode06.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode06.knw"
- },
- {
- "hostname": "knode07.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode07.knw"
- },
- {
- "hostname": "knode08.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode08.knw"
- },
- {
- "hostname": "knode09.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode09.knw"
- },
- {
- "hostname": "knode10.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode10.knw"
- },
- {
- "hostname": "knode11.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode11.knw"
- },
- {
- "hostname": "knode12.knw",
- "user": "root",
- "ssh_args": "",
- "ssh_hostname": "",
- "externally_routable_ip": "knode12.knw"
- }
- ]
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/directory_layout/kafka_path.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/directory_layout/kafka_path.py b/tests/kafkatest/directory_layout/kafka_path.py
index 9688174..0e60aff 100644
--- a/tests/kafkatest/directory_layout/kafka_path.py
+++ b/tests/kafkatest/directory_layout/kafka_path.py
@@ -44,11 +44,11 @@ TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME = "tools-dependant-libs"
JARS = {
"trunk": {
- CORE_JAR_NAME: "libs/*.jar",
- CORE_LIBS_JAR_NAME: "libs/*.jar",
- CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/*.jar",
- TOOLS_JAR_NAME: "libs/*.jar",
- TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/*.jar"
+ CORE_JAR_NAME: "core/build/*/*.jar",
+ CORE_LIBS_JAR_NAME: "core/build/libs/*.jar",
+ CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "core/build/dependant-testlibs/*.jar",
+ TOOLS_JAR_NAME: "tools/build/libs/kafka-tools*.jar",
+ TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "tools/build/dependant-libs*/*.jar"
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 626a0ff..14af4cf 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -180,13 +180,13 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
cmd = self.start_cmd(node)
self.logger.debug("Mirror maker command: %s", cmd)
node.account.ssh(cmd, allow_fail=False)
- wait_until(lambda: self.alive(node), timeout_sec=30, backoff_sec=.5,
+ wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to start.")
self.logger.debug("Mirror maker is alive")
def stop_node(self, node, clean_shutdown=True):
node.account.kill_process("java", allow_fail=True, clean_shutdown=clean_shutdown)
- wait_until(lambda: not self.alive(node), timeout_sec=30, backoff_sec=.5,
+ wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")
def clean_node(self, node):
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/__init__.py b/tests/kafkatest/tests/client/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/tests/client/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
new file mode 100644
index 0000000..0de53ae
--- /dev/null
+++ b/tests/kafkatest/tests/client/compression_test.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int_with_prefix
+
+class CompressionTest(ProduceConsumeValidateTest):
+ """
+ These tests validate produce / consume for compressed topics.
+ """
+
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(CompressionTest, self).__init__(test_context=test_context)
+
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
+ "partitions": 10,
+ "replication-factor": 1}})
+ self.num_partitions = 10
+ self.timeout_sec = 60
+ self.producer_throughput = 1000
+ self.num_producers = 4
+ self.messages_per_producer = 1000
+ self.num_consumers = 1
+
+ def setUp(self):
+ self.zk.start()
+
+ def min_cluster_size(self):
+ # Override this since we're adding services outside of the constructor
+ return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+ @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
+ @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
+ def test_compressed_topic(self, compression_types, new_consumer):
+ """Test produce => consume => validate for compressed topics
+ Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
+
+ compression_types parameter gives a list of compression types (or no compression if
+ "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
+ compression type from the list based on producer's index in the group.
+
+ - Produce messages in the background
+ - Consume messages in the background
+ - Stop producing, and finish consuming
+ - Validate that every acked message was consumed
+ """
+
+ self.kafka.security_protocol = "PLAINTEXT"
+ self.kafka.interbroker_security_protocol = self.kafka.security_protocol
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic, throughput=self.producer_throughput,
+ message_validator=is_int_with_prefix,
+ compression_types=compression_types)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
+ new_consumer=new_consumer, consumer_timeout_ms=60000,
+ message_validator=is_int_with_prefix)
+ self.kafka.start()
+
+ self.run_produce_consume_validate(lambda: wait_until(
+ lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+ timeout_sec=120, backoff_sec=1,
+ err_msg="Producer did not produce all messages in reasonable amount of time"))
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
new file mode 100644
index 0000000..3cd3c7c
--- /dev/null
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 4
+ RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+ ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+ def __init__(self, test_context):
+ super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
+ num_zk=1, num_brokers=1, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+ })
+
+ def _verify_range_assignment(self, consumer):
+ # range assignment should give us two partition sets: (0, 1) and (2, 3)
+ assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+ assert assignment == set([
+ frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+ frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+ def _verify_roundrobin_assignment(self, consumer):
+ assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+ assert assignment == set([
+ frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+ frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+ def rolling_update_test(self):
+ """
+ Verify rolling updates of partition assignment strategies works correctly. In this
+ test, we use a rolling restart to change the group's assignment strategy from "range"
+ to "roundrobin." We verify after every restart that all members are still in the group
+ and that the correct assignment strategy was used.
+ """
+
+ # initialize the consumer using range assignment
+ consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
+
+ consumer.start()
+ self.await_all_members(consumer)
+ self._verify_range_assignment(consumer)
+
+ # change consumer configuration to prefer round-robin assignment, but still support range assignment
+ consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+ # restart one of the nodes and verify that we are still using range assignment
+ consumer.stop_node(consumer.nodes[0])
+ consumer.start_node(consumer.nodes[0])
+ self.await_all_members(consumer)
+ self._verify_range_assignment(consumer)
+
+ # now restart the other node and verify that we have switched to round-robin
+ consumer.stop_node(consumer.nodes[1])
+ consumer.start_node(consumer.nodes[1])
+ self.await_all_members(consumer)
+ self._verify_roundrobin_assignment(consumer)
+
+ # if we want, we can now drop support for range assignment
+ consumer.assignment_strategy = self.ROUND_ROBIN
+ for node in consumer.nodes:
+ consumer.stop_node(node)
+ consumer.start_node(node)
+ self.await_all_members(consumer)
+ self._verify_roundrobin_assignment(consumer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
new file mode 100644
index 0000000..534f65c
--- /dev/null
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+import signal
+
+class OffsetValidationTest(VerifiableConsumerTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 1
+
+ def __init__(self, test_context):
+ super(OffsetValidationTest, self).__init__(test_context, num_consumers=3, num_producers=1,
+ num_zk=1, num_brokers=2, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
+ })
+
+ def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in consumer.nodes:
+ consumer.stop_node(node, clean_shutdown)
+
+ wait_until(lambda: len(consumer.dead_nodes()) == 1,
+ timeout_sec=self.session_timeout_sec+5,
+ err_msg="Timed out waiting for the consumer to shutdown")
+
+ consumer.start_node(node)
+
+ self.await_all_members(consumer)
+ self.await_consumed_messages(consumer)
+
+ def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in consumer.nodes:
+ consumer.stop_node(node, clean_shutdown)
+
+ wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+ err_msg="Timed out waiting for the consumers to shutdown")
+
+ for node in consumer.nodes:
+ consumer.start_node(node)
+
+ self.await_all_members(consumer)
+ self.await_consumed_messages(consumer)
+
+ def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in self.kafka.nodes:
+ self.kafka.restart_node(node, clean_shutdown=True)
+ self.await_all_members(consumer)
+ self.await_consumed_messages(consumer)
+
+ def setup_consumer(self, topic, **kwargs):
+ # collect verifiable consumer events since this makes debugging much easier
+ consumer = super(OffsetValidationTest, self).setup_consumer(topic, **kwargs)
+ self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
+ return consumer
+
+ def test_broker_rolling_bounce(self):
+ """
+ Verify correct consumer behavior when the brokers are consecutively restarted.
+
+ Setup: single Kafka cluster with one producer writing messages to a single topic with one
+ partition, an a set of consumers in the same group reading from the same topic.
+
+ - Start a producer which continues producing new messages throughout the test.
+ - Start up the consumers and wait until they've joined the group.
+ - In a loop, restart each broker consecutively, waiting for the group to stabilize between
+ each broker restart.
+ - Verify delivery semantics according to the failure type and that the broker bounces
+ did not cause unexpected group rebalances.
+ """
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer = self.setup_producer(self.TOPIC)
+ consumer = self.setup_consumer(self.TOPIC)
+
+ producer.start()
+ self.await_produced_messages(producer)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ num_rebalances = consumer.num_rebalances()
+ # TODO: make this test work with hard shutdowns, which probably requires
+ # pausing before the node is restarted to ensure that any ephemeral
+ # nodes have time to expire
+ self.rolling_bounce_brokers(consumer, clean_shutdown=True)
+
+ unexpected_rebalances = consumer.num_rebalances() - num_rebalances
+ assert unexpected_rebalances == 0, \
+ "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
+
+ consumer.stop_all()
+
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
+ def test_consumer_bounce(self, clean_shutdown, bounce_mode):
+ """
+ Verify correct consumer behavior when the consumers in the group are consecutively restarted.
+
+ Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+ - Start a producer which continues producing new messages throughout the test.
+ - Start up the consumers and wait until they've joined the group.
+ - In a loop, restart each consumer, waiting for each one to rejoin the group before
+ restarting the rest.
+ - Verify delivery semantics according to the failure type.
+ """
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer = self.setup_producer(self.TOPIC)
+ consumer = self.setup_consumer(self.TOPIC)
+
+ producer.start()
+ self.await_produced_messages(producer)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ if bounce_mode == "all":
+ self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
+ else:
+ self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
+
+ consumer.stop_all()
+ if clean_shutdown:
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ # this can only be guaranteed with a clean shutdown
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+ else:
+ # we may have duplicates in a hard failure
+ assert consumer.current_position(partition) <= consumer.total_consumed(), \
+ "Current position greater than the total number of consumed records"
+
+ @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+ def test_consumer_failure(self, clean_shutdown, enable_autocommit):
+ partition = TopicPartition(self.TOPIC, 0)
+
+ consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+ producer = self.setup_producer(self.TOPIC)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ partition_owner = consumer.owner(partition)
+ assert partition_owner is not None
+
+ # startup the producer and ensure that some records have been written
+ producer.start()
+ self.await_produced_messages(producer)
+
+ # stop the partition owner and await its shutdown
+ consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
+ wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
+ timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for consumer to close")
+
+ # ensure that the remaining consumer does some work after rebalancing
+ self.await_consumed_messages(consumer, min_messages=1000)
+
+ consumer.stop_all()
+
+ if clean_shutdown:
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ # this can only be guaranteed with a clean shutdown
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+ else:
+ # we may have duplicates in a hard failure
+ assert consumer.current_position(partition) <= consumer.total_consumed(), \
+ "Current position greater than the total number of consumed records"
+
+ # if autocommit is not turned on, we can also verify the last committed offset
+ if not enable_autocommit:
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+
+ @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+ def test_broker_failure(self, clean_shutdown, enable_autocommit):
+ partition = TopicPartition(self.TOPIC, 0)
+
+ consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+ producer = self.setup_producer(self.TOPIC)
+
+ producer.start()
+ consumer.start()
+ self.await_all_members(consumer)
+
+ num_rebalances = consumer.num_rebalances()
+
+ # shutdown one of the brokers
+ # TODO: we need a way to target the coordinator instead of picking arbitrarily
+ self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
+
+ # ensure that the consumers do some work after the broker failure
+ self.await_consumed_messages(consumer, min_messages=1000)
+
+ # verify that there were no rebalances on failover
+ assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
+
+ consumer.stop_all()
+
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ # if autocommit is not turned on, we can also verify the last committed offset
+ if not enable_autocommit:
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+ def test_group_consumption(self):
+ """
+ Verifies correct group rebalance behavior as consumers are started and stopped.
+ In particular, this test verifies that the partition is readable after every
+ expected rebalance.
+
+ Setup: single Kafka cluster with a group of consumers reading from one topic
+ with one partition while the verifiable producer writes to it.
+
+ - Start the consumers one by one, verifying consumption after each rebalance
+ - Shutdown the consumers one by one, verifying consumption after each rebalance
+ """
+ consumer = self.setup_consumer(self.TOPIC)
+ producer = self.setup_producer(self.TOPIC)
+
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer.start()
+
+ for num_started, node in enumerate(consumer.nodes, 1):
+ consumer.start_node(node)
+ self.await_members(consumer, num_started)
+ self.await_consumed_messages(consumer)
+
+ for num_stopped, node in enumerate(consumer.nodes, 1):
+ consumer.stop_node(node)
+
+ if num_stopped < self.num_consumers:
+ self.await_members(consumer, self.num_consumers - num_stopped)
+ self.await_consumed_messages(consumer)
+
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+
+class AssignmentValidationTest(VerifiableConsumerTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 6
+
+ def __init__(self, test_context):
+ super(AssignmentValidationTest, self).__init__(test_context, num_consumers=3, num_producers=0,
+ num_zk=1, num_brokers=2, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
+ })
+
+ @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+ "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
+ def test_valid_assignment(self, assignment_strategy):
+ """
+ Verify assignment strategy correctness: each partition is assigned to exactly
+ one consumer instance.
+
+ Setup: single Kafka cluster with a set of consumers in the same group.
+
+ - Start the consumers one by one
+ - Validate assignment after every expected rebalance
+ """
+ consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy)
+ for num_started, node in enumerate(consumer.nodes, 1):
+ consumer.start_node(node)
+ self.await_members(consumer, num_started)
+ assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
new file mode 100644
index 0000000..a57c04b
--- /dev/null
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -0,0 +1,90 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
+
+
+class MessageFormatChangeTest(ProduceConsumeValidateTest):
+
+ def __init__(self, test_context):
+ super(MessageFormatChangeTest, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+
+ self.zk.start()
+
+ # Producer and consumer
+ self.producer_throughput = 10000
+ self.num_producers = 1
+ self.num_consumers = 1
+ self.messages_per_producer = 100
+
+ def produce_and_consume(self, producer_version, consumer_version, group):
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic,
+ throughput=self.producer_throughput,
+ message_validator=is_int,
+ version=KafkaVersion(producer_version))
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+ self.topic, new_consumer=False, consumer_timeout_ms=30000,
+ message_validator=is_int, version=KafkaVersion(consumer_version))
+ self.consumer.group_id = group
+ self.run_produce_consume_validate(lambda: wait_until(
+ lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+ timeout_sec=120, backoff_sec=1,
+ err_msg="Producer did not produce all messages in reasonable amount of time"))
+
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
+ def test_compatibility(self, producer_version, consumer_version):
+ """ This tests performs the following checks:
+ The workload is a mix of 0.9.x and 0.10.x producers and consumers
+ that produce to and consume from a 0.10.x cluster
+ 1. initially the topic is using message format 0.9.0
+ 2. change the message format version for topic to 0.10.0 on the fly.
+ 3. change the message format version for topic back to 0.9.0 on the fly.
+ - The producers and consumers should not have any issue.
+ - Note that for 0.9.x consumers/producers we only do steps 1 and 2
+ """
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
+ "partitions": 3,
+ "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}})
+
+ self.kafka.start()
+ self.logger.info("First format change to 0.9.0")
+ self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+ self.produce_and_consume(producer_version, consumer_version, "group1")
+
+ self.logger.info("Second format change to 0.10.0")
+ self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
+ self.produce_and_consume(producer_version, consumer_version, "group2")
+
+ if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
+ self.logger.info("Third format change back to 0.9.0")
+ self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+ self.produce_and_consume(producer_version, consumer_version, "group3")
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
new file mode 100644
index 0000000..1d31569
--- /dev/null
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import matrix, parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.performance import ProducerPerformanceService
+from kafkatest.services.console_consumer import ConsoleConsumer
+
+class QuotaConfig(object):
+ CLIENT_ID = 'client-id'
+ USER = 'user'
+ USER_CLIENT = '(user, client-id)'
+
+ LARGE_QUOTA = 1000 * 1000 * 1000
+ USER_PRINCIPAL = 'CN=systemtest'
+
+ def __init__(self, quota_type, override_quota, kafka):
+ if quota_type == QuotaConfig.CLIENT_ID:
+ if override_quota:
+ self.client_id = 'overridden_id'
+ self.producer_quota = 3750000
+ self.consumer_quota = 3000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', self.client_id])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+ else:
+ self.client_id = 'default_id'
+ self.producer_quota = 2500000
+ self.consumer_quota = 2000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id'])
+ elif quota_type == QuotaConfig.USER:
+ if override_quota:
+ self.client_id = 'some_id'
+ self.producer_quota = 3750000
+ self.consumer_quota = 3000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+ else:
+ self.client_id = 'some_id'
+ self.producer_quota = 2500000
+ self.consumer_quota = 2000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+ elif quota_type == QuotaConfig.USER_CLIENT:
+ if override_quota:
+ self.client_id = 'overridden_id'
+ self.producer_quota = 3750000
+ self.consumer_quota = 3000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+ else:
+ self.client_id = 'default_id'
+ self.producer_quota = 2500000
+ self.consumer_quota = 2000000
+ self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None, 'clients', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+ self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+
+ def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
+ node = kafka.nodes[0]
+ cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
+ (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate)
+ cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
+ if len(entity_args) > 2:
+ cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
+ node.account.ssh(cmd)
+
+ def entity_name_opt(self, name):
+ return " --entity-default" if name is None else " --entity-name " + name
+
+class QuotaTest(Test):
+ """
+ These tests verify that quota provides expected functionality -- they run
+ producer, broker, and consumer with different clientId and quota configuration and
+ check that the observed throughput is close to the value we expect.
+ """
+
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(QuotaTest, self).__init__(test_context=test_context)
+
+ self.topic = 'test_topic'
+ self.logger.info('use topic ' + self.topic)
+
+ self.maximum_client_deviation_percentage = 100.0
+ self.maximum_broker_deviation_percentage = 5.0
+ self.num_records = 50000
+ self.record_size = 3000
+
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+ security_protocol='SSL', authorizer_class_name='',
+ interbroker_security_protocol='SSL',
+ topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
+ jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
+ 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
+ jmx_attributes=['OneMinuteRate'])
+ self.num_producers = 1
+ self.num_consumers = 2
+
+ def setUp(self):
+ self.zk.start()
+ self.kafka.start()
+
+ def min_cluster_size(self):
+ """Override this since we're adding services outside of the constructor"""
+ return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+ @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
+ @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
+ def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
+ self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
+ producer_client_id = self.quota_config.client_id
+ consumer_client_id = self.quota_config.client_id
+
+ # Produce all messages
+ producer = ProducerPerformanceService(
+ self.test_context, producer_num, self.kafka,
+ topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
+ jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
+
+ producer.run()
+
+ # Consume all messages
+ consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
+ consumer_timeout_ms=60000, client_id=consumer_client_id,
+ jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
+ jmx_attributes=['bytes-consumed-rate'])
+ consumer.run()
+
+ for idx, messages in consumer.messages_consumed.iteritems():
+ assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
+
+ success, msg = self.validate(self.kafka, producer, consumer)
+ assert success, msg
+
+ def validate(self, broker, producer, consumer):
+ """
+ For each client_id we validate that:
+ 1) number of consumed messages equals number of produced messages
+ 2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+ 3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+ 4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+ 5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+ """
+ success = True
+ msg = ''
+
+ self.kafka.read_jmx_output_all_nodes()
+
+ # validate that number of consumed messages equals number of produced messages
+ produced_num = sum([value['records'] for value in producer.results])
+ consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
+ self.logger.info('producer produced %d messages' % produced_num)
+ self.logger.info('consumer consumed %d messages' % consumed_num)
+ if produced_num != consumed_num:
+ success = False
+ msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
+
+ # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+ producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
+ producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
+ producer_quota_bps = self.quota_config.producer_quota
+ self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
+ if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+ success = False
+ msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+ (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
+
+ # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+ broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
+ broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
+ self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
+ (broker_maximum_byte_in_bps, producer_quota_bps))
+ if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+ success = False
+ msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+ (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
+
+ # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+ consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
+ consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
+ consumer_quota_bps = self.quota_config.consumer_quota
+ self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
+ if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+ success = False
+ msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+ (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
+
+ # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+ broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
+ broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
+ self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
+ (broker_maximum_byte_out_bps, consumer_quota_bps))
+ if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+ success = False
+ msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+ (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
+
+ return success, msg
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client1/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/__init__.py b/tests/kafkatest/tests/client1/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
deleted file mode 100644
index 3cd3c7c..0000000
--- a/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
- TOPIC = "test_topic"
- NUM_PARTITIONS = 4
- RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
- ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
-
- def __init__(self, test_context):
- super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
- num_zk=1, num_brokers=1, topics={
- self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
- })
-
- def _verify_range_assignment(self, consumer):
- # range assignment should give us two partition sets: (0, 1) and (2, 3)
- assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
- assert assignment == set([
- frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
- frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
-
- def _verify_roundrobin_assignment(self, consumer):
- assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
- assert assignment == set([
- frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
- frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
-
- def rolling_update_test(self):
- """
- Verify rolling updates of partition assignment strategies works correctly. In this
- test, we use a rolling restart to change the group's assignment strategy from "range"
- to "roundrobin." We verify after every restart that all members are still in the group
- and that the correct assignment strategy was used.
- """
-
- # initialize the consumer using range assignment
- consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
-
- consumer.start()
- self.await_all_members(consumer)
- self._verify_range_assignment(consumer)
-
- # change consumer configuration to prefer round-robin assignment, but still support range assignment
- consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
-
- # restart one of the nodes and verify that we are still using range assignment
- consumer.stop_node(consumer.nodes[0])
- consumer.start_node(consumer.nodes[0])
- self.await_all_members(consumer)
- self._verify_range_assignment(consumer)
-
- # now restart the other node and verify that we have switched to round-robin
- consumer.stop_node(consumer.nodes[1])
- consumer.start_node(consumer.nodes[1])
- self.await_all_members(consumer)
- self._verify_roundrobin_assignment(consumer)
-
- # if we want, we can now drop support for range assignment
- consumer.assignment_strategy = self.ROUND_ROBIN
- for node in consumer.nodes:
- consumer.stop_node(node)
- consumer.start_node(node)
- self.await_all_members(consumer)
- self._verify_roundrobin_assignment(consumer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client1/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/message_format_change_test.py b/tests/kafkatest/tests/client1/message_format_change_test.py
deleted file mode 100644
index a57c04b..0000000
--- a/tests/kafkatest/tests/client1/message_format_change_test.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
-
-
-class MessageFormatChangeTest(ProduceConsumeValidateTest):
-
- def __init__(self, test_context):
- super(MessageFormatChangeTest, self).__init__(test_context=test_context)
-
- def setUp(self):
- self.topic = "test_topic"
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
-
- self.zk.start()
-
- # Producer and consumer
- self.producer_throughput = 10000
- self.num_producers = 1
- self.num_consumers = 1
- self.messages_per_producer = 100
-
- def produce_and_consume(self, producer_version, consumer_version, group):
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
- self.topic,
- throughput=self.producer_throughput,
- message_validator=is_int,
- version=KafkaVersion(producer_version))
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
- self.topic, new_consumer=False, consumer_timeout_ms=30000,
- message_validator=is_int, version=KafkaVersion(consumer_version))
- self.consumer.group_id = group
- self.run_produce_consume_validate(lambda: wait_until(
- lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
- timeout_sec=120, backoff_sec=1,
- err_msg="Producer did not produce all messages in reasonable amount of time"))
-
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
- def test_compatibility(self, producer_version, consumer_version):
- """ This tests performs the following checks:
- The workload is a mix of 0.9.x and 0.10.x producers and consumers
- that produce to and consume from a 0.10.x cluster
- 1. initially the topic is using message format 0.9.0
- 2. change the message format version for topic to 0.10.0 on the fly.
- 3. change the message format version for topic back to 0.9.0 on the fly.
- - The producers and consumers should not have any issue.
- - Note that for 0.9.x consumers/producers we only do steps 1 and 2
- """
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
- "partitions": 3,
- "replication-factor": 3,
- 'configs': {"min.insync.replicas": 2}}})
-
- self.kafka.start()
- self.logger.info("First format change to 0.9.0")
- self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
- self.produce_and_consume(producer_version, consumer_version, "group1")
-
- self.logger.info("Second format change to 0.10.0")
- self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
- self.produce_and_consume(producer_version, consumer_version, "group2")
-
- if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
- self.logger.info("Third format change back to 0.9.0")
- self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
- self.produce_and_consume(producer_version, consumer_version, "group3")
-
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client1/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/quota_test.py b/tests/kafkatest/tests/client1/quota_test.py
deleted file mode 100644
index 1d31569..0000000
--- a/tests/kafkatest/tests/client1/quota_test.py
+++ /dev/null
@@ -1,219 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import matrix, parametrize
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.console_consumer import ConsoleConsumer
-
-class QuotaConfig(object):
- CLIENT_ID = 'client-id'
- USER = 'user'
- USER_CLIENT = '(user, client-id)'
-
- LARGE_QUOTA = 1000 * 1000 * 1000
- USER_PRINCIPAL = 'CN=systemtest'
-
- def __init__(self, quota_type, override_quota, kafka):
- if quota_type == QuotaConfig.CLIENT_ID:
- if override_quota:
- self.client_id = 'overridden_id'
- self.producer_quota = 3750000
- self.consumer_quota = 3000000
- self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', self.client_id])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
- else:
- self.client_id = 'default_id'
- self.producer_quota = 2500000
- self.consumer_quota = 2000000
- self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', None])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id'])
- elif quota_type == QuotaConfig.USER:
- if override_quota:
- self.client_id = 'some_id'
- self.producer_quota = 3750000
- self.consumer_quota = 3000000
- self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
- else:
- self.client_id = 'some_id'
- self.producer_quota = 2500000
- self.consumer_quota = 2000000
- self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
- elif quota_type == QuotaConfig.USER_CLIENT:
- if override_quota:
- self.client_id = 'overridden_id'
- self.producer_quota = 3750000
- self.consumer_quota = 3000000
- self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
- else:
- self.client_id = 'default_id'
- self.producer_quota = 2500000
- self.consumer_quota = 2000000
- self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None, 'clients', None])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
- self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-
- def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
- node = kafka.nodes[0]
- cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
- (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate)
- cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
- if len(entity_args) > 2:
- cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
- node.account.ssh(cmd)
-
- def entity_name_opt(self, name):
- return " --entity-default" if name is None else " --entity-name " + name
-
-class QuotaTest(Test):
- """
- These tests verify that quota provides expected functionality -- they run
- producer, broker, and consumer with different clientId and quota configuration and
- check that the observed throughput is close to the value we expect.
- """
-
- def __init__(self, test_context):
- """:type test_context: ducktape.tests.test.TestContext"""
- super(QuotaTest, self).__init__(test_context=test_context)
-
- self.topic = 'test_topic'
- self.logger.info('use topic ' + self.topic)
-
- self.maximum_client_deviation_percentage = 100.0
- self.maximum_broker_deviation_percentage = 5.0
- self.num_records = 50000
- self.record_size = 3000
-
- self.zk = ZookeeperService(test_context, num_nodes=1)
- self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
- security_protocol='SSL', authorizer_class_name='',
- interbroker_security_protocol='SSL',
- topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
- jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
- 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
- jmx_attributes=['OneMinuteRate'])
- self.num_producers = 1
- self.num_consumers = 2
-
- def setUp(self):
- self.zk.start()
- self.kafka.start()
-
- def min_cluster_size(self):
- """Override this since we're adding services outside of the constructor"""
- return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
- @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
- @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
- def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
- self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
- producer_client_id = self.quota_config.client_id
- consumer_client_id = self.quota_config.client_id
-
- # Produce all messages
- producer = ProducerPerformanceService(
- self.test_context, producer_num, self.kafka,
- topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
- jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
-
- producer.run()
-
- # Consume all messages
- consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
- consumer_timeout_ms=60000, client_id=consumer_client_id,
- jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
- jmx_attributes=['bytes-consumed-rate'])
- consumer.run()
-
- for idx, messages in consumer.messages_consumed.iteritems():
- assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
-
- success, msg = self.validate(self.kafka, producer, consumer)
- assert success, msg
-
- def validate(self, broker, producer, consumer):
- """
- For each client_id we validate that:
- 1) number of consumed messages equals number of produced messages
- 2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
- 3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
- 4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
- 5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
- """
- success = True
- msg = ''
-
- self.kafka.read_jmx_output_all_nodes()
-
- # validate that number of consumed messages equals number of produced messages
- produced_num = sum([value['records'] for value in producer.results])
- consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
- self.logger.info('producer produced %d messages' % produced_num)
- self.logger.info('consumer consumed %d messages' % consumed_num)
- if produced_num != consumed_num:
- success = False
- msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
-
- # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
- producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
- producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
- producer_quota_bps = self.quota_config.producer_quota
- self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
- if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
- success = False
- msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
- (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
-
- # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
- broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
- broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
- self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
- (broker_maximum_byte_in_bps, producer_quota_bps))
- if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
- success = False
- msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
- (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
-
- # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
- consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
- consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
- consumer_quota_bps = self.quota_config.consumer_quota
- self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
- if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
- success = False
- msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
- (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
-
- # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
- broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
- broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
- self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
- (broker_maximum_byte_out_bps, consumer_quota_bps))
- if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
- success = False
- msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
- (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
-
- return success, msg
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/client2/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client2/__init__.py b/tests/kafkatest/tests/client2/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/tests/kafkatest/tests/client2/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.