You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/11/24 04:49:05 UTC
[3/4] kafka git commit: KAFKA-4345;
Run decktape test for each pull request
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client2/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client2/consumer_test.py b/tests/kafkatest/tests/client2/consumer_test.py
new file mode 100644
index 0000000..534f65c
--- /dev/null
+++ b/tests/kafkatest/tests/client2/consumer_test.py
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+import signal
+
+class OffsetValidationTest(VerifiableConsumerTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 1
+
+ def __init__(self, test_context):
+ super(OffsetValidationTest, self).__init__(test_context, num_consumers=3, num_producers=1,
+ num_zk=1, num_brokers=2, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
+ })
+
+ def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in consumer.nodes:
+ consumer.stop_node(node, clean_shutdown)
+
+ wait_until(lambda: len(consumer.dead_nodes()) == 1,
+ timeout_sec=self.session_timeout_sec+5,
+ err_msg="Timed out waiting for the consumer to shutdown")
+
+ consumer.start_node(node)
+
+ self.await_all_members(consumer)
+ self.await_consumed_messages(consumer)
+
+ def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in consumer.nodes:
+ consumer.stop_node(node, clean_shutdown)
+
+ wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+ err_msg="Timed out waiting for the consumers to shutdown")
+
+ for node in consumer.nodes:
+ consumer.start_node(node)
+
+ self.await_all_members(consumer)
+ self.await_consumed_messages(consumer)
+
+ def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in self.kafka.nodes:
+ self.kafka.restart_node(node, clean_shutdown=True)
+ self.await_all_members(consumer)
+ self.await_consumed_messages(consumer)
+
+ def setup_consumer(self, topic, **kwargs):
+ # collect verifiable consumer events since this makes debugging much easier
+ consumer = super(OffsetValidationTest, self).setup_consumer(topic, **kwargs)
+ self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
+ return consumer
+
+ def test_broker_rolling_bounce(self):
+ """
+ Verify correct consumer behavior when the brokers are consecutively restarted.
+
+ Setup: single Kafka cluster with one producer writing messages to a single topic with one
+ partition, an a set of consumers in the same group reading from the same topic.
+
+ - Start a producer which continues producing new messages throughout the test.
+ - Start up the consumers and wait until they've joined the group.
+ - In a loop, restart each broker consecutively, waiting for the group to stabilize between
+ each broker restart.
+ - Verify delivery semantics according to the failure type and that the broker bounces
+ did not cause unexpected group rebalances.
+ """
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer = self.setup_producer(self.TOPIC)
+ consumer = self.setup_consumer(self.TOPIC)
+
+ producer.start()
+ self.await_produced_messages(producer)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ num_rebalances = consumer.num_rebalances()
+ # TODO: make this test work with hard shutdowns, which probably requires
+ # pausing before the node is restarted to ensure that any ephemeral
+ # nodes have time to expire
+ self.rolling_bounce_brokers(consumer, clean_shutdown=True)
+
+ unexpected_rebalances = consumer.num_rebalances() - num_rebalances
+ assert unexpected_rebalances == 0, \
+ "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
+
+ consumer.stop_all()
+
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
+ def test_consumer_bounce(self, clean_shutdown, bounce_mode):
+ """
+ Verify correct consumer behavior when the consumers in the group are consecutively restarted.
+
+ Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+ - Start a producer which continues producing new messages throughout the test.
+ - Start up the consumers and wait until they've joined the group.
+ - In a loop, restart each consumer, waiting for each one to rejoin the group before
+ restarting the rest.
+ - Verify delivery semantics according to the failure type.
+ """
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer = self.setup_producer(self.TOPIC)
+ consumer = self.setup_consumer(self.TOPIC)
+
+ producer.start()
+ self.await_produced_messages(producer)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ if bounce_mode == "all":
+ self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
+ else:
+ self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
+
+ consumer.stop_all()
+ if clean_shutdown:
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ # this can only be guaranteed with a clean shutdown
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+ else:
+ # we may have duplicates in a hard failure
+ assert consumer.current_position(partition) <= consumer.total_consumed(), \
+ "Current position greater than the total number of consumed records"
+
+ @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+ def test_consumer_failure(self, clean_shutdown, enable_autocommit):
+ partition = TopicPartition(self.TOPIC, 0)
+
+ consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+ producer = self.setup_producer(self.TOPIC)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ partition_owner = consumer.owner(partition)
+ assert partition_owner is not None
+
+ # startup the producer and ensure that some records have been written
+ producer.start()
+ self.await_produced_messages(producer)
+
+ # stop the partition owner and await its shutdown
+ consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
+ wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
+ timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for consumer to close")
+
+ # ensure that the remaining consumer does some work after rebalancing
+ self.await_consumed_messages(consumer, min_messages=1000)
+
+ consumer.stop_all()
+
+ if clean_shutdown:
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ # this can only be guaranteed with a clean shutdown
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+ else:
+ # we may have duplicates in a hard failure
+ assert consumer.current_position(partition) <= consumer.total_consumed(), \
+ "Current position greater than the total number of consumed records"
+
+ # if autocommit is not turned on, we can also verify the last committed offset
+ if not enable_autocommit:
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+
+ @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+ def test_broker_failure(self, clean_shutdown, enable_autocommit):
+ partition = TopicPartition(self.TOPIC, 0)
+
+ consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+ producer = self.setup_producer(self.TOPIC)
+
+ producer.start()
+ consumer.start()
+ self.await_all_members(consumer)
+
+ num_rebalances = consumer.num_rebalances()
+
+ # shutdown one of the brokers
+ # TODO: we need a way to target the coordinator instead of picking arbitrarily
+ self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
+
+ # ensure that the consumers do some work after the broker failure
+ self.await_consumed_messages(consumer, min_messages=1000)
+
+ # verify that there were no rebalances on failover
+ assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
+
+ consumer.stop_all()
+
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ # if autocommit is not turned on, we can also verify the last committed offset
+ if not enable_autocommit:
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+ def test_group_consumption(self):
+ """
+ Verifies correct group rebalance behavior as consumers are started and stopped.
+ In particular, this test verifies that the partition is readable after every
+ expected rebalance.
+
+ Setup: single Kafka cluster with a group of consumers reading from one topic
+ with one partition while the verifiable producer writes to it.
+
+ - Start the consumers one by one, verifying consumption after each rebalance
+ - Shutdown the consumers one by one, verifying consumption after each rebalance
+ """
+ consumer = self.setup_consumer(self.TOPIC)
+ producer = self.setup_producer(self.TOPIC)
+
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer.start()
+
+ for num_started, node in enumerate(consumer.nodes, 1):
+ consumer.start_node(node)
+ self.await_members(consumer, num_started)
+ self.await_consumed_messages(consumer)
+
+ for num_stopped, node in enumerate(consumer.nodes, 1):
+ consumer.stop_node(node)
+
+ if num_stopped < self.num_consumers:
+ self.await_members(consumer, self.num_consumers - num_stopped)
+ self.await_consumed_messages(consumer)
+
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+
+class AssignmentValidationTest(VerifiableConsumerTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 6
+
+ def __init__(self, test_context):
+ super(AssignmentValidationTest, self).__init__(test_context, num_consumers=3, num_producers=0,
+ num_zk=1, num_brokers=2, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
+ })
+
+ @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+ "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
+ def test_valid_assignment(self, assignment_strategy):
+ """
+ Verify assignment strategy correctness: each partition is assigned to exactly
+ one consumer instance.
+
+ Setup: single Kafka cluster with a set of consumers in the same group.
+
+ - Start the consumers one by one
+ - Validate assignment after every expected rebalance
+ """
+ consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy)
+ for num_started, node in enumerate(consumer.nodes, 1):
+ consumer.start_node(node)
+ self.await_members(consumer, num_started)
+ assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/__init__.py b/tests/kafkatest/tests/core/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/tests/kafkatest/tests/core/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
deleted file mode 100644
index d6a0a12..0000000
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka import config_property
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
-
-
-# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
-class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
-
- def __init__(self, test_context):
- super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
-
- def setUp(self):
- self.topic = "test_topic"
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
-
- self.zk.start()
-
- # Producer and consumer
- self.producer_throughput = 10000
- self.num_producers = 1
- self.num_consumers = 1
- self.messages_per_producer = 1000
-
- @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)
- @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=None)
- @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
- def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
-
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
- "partitions": 3,
- "replication-factor": 3,
- 'configs': {"min.insync.replicas": 2}}})
- for node in self.kafka.nodes:
- if timestamp_type is not None:
- node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
- self.kafka.start()
-
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
- self.topic, throughput=self.producer_throughput,
- message_validator=is_int,
- compression_types=compression_types,
- version=KafkaVersion(producer_version))
-
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
- self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
- message_validator=is_int, version=KafkaVersion(consumer_version))
-
- self.run_produce_consume_validate(lambda: wait_until(
- lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
- timeout_sec=120, backoff_sec=1,
- err_msg="Producer did not produce all messages in reasonable amount of time"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/consumer_group_command_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py
deleted file mode 100644
index c3f59d9..0000000
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-
-import os
-import re
-
-TOPIC = "topic-consumer-group-command"
-
-class ConsumerGroupCommandTest(Test):
- """
- Tests ConsumerGroupCommand
- """
- # Root directory for persistent output
- PERSISTENT_ROOT = "/mnt/consumer_group_command"
- COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
-
- def __init__(self, test_context):
- super(ConsumerGroupCommandTest, self).__init__(test_context)
- self.num_zk = 1
- self.num_brokers = 1
- self.topics = {
- TOPIC: {'partitions': 1, 'replication-factor': 1}
- }
- self.zk = ZookeeperService(test_context, self.num_zk)
-
- def setUp(self):
- self.zk.start()
-
- def start_kafka(self, security_protocol, interbroker_security_protocol):
- self.kafka = KafkaService(
- self.test_context, self.num_brokers,
- self.zk, security_protocol=security_protocol,
- interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
- self.kafka.start()
-
- def start_consumer(self, security_protocol):
- enable_new_consumer = security_protocol == SecurityConfig.SSL
- self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
- consumer_timeout_ms=None, new_consumer=enable_new_consumer)
- self.consumer.start()
-
- def setup_and_verify(self, security_protocol, group=None):
- self.start_kafka(security_protocol, security_protocol)
- self.start_consumer(security_protocol)
- consumer_node = self.consumer.nodes[0]
- wait_until(lambda: self.consumer.alive(consumer_node),
- timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
- kafka_node = self.kafka.nodes[0]
- if security_protocol is not SecurityConfig.PLAINTEXT:
- prop_file = str(self.kafka.security_config.client_config())
- self.logger.debug(prop_file)
- kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
- kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)
-
- # Verify ConsumerGroupCommand lists expected consumer groups
- enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
- command_config_file = None
- if enable_new_consumer:
- command_config_file = self.COMMAND_CONFIG_FILE
-
- if group:
- wait_until(lambda: re.search("topic-consumer-group-command",self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file)), timeout_sec=10,
- err_msg="Timed out waiting to list expected consumer groups.")
- else:
- wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
- err_msg="Timed out waiting to list expected consumer groups.")
-
- self.consumer.stop()
-
- @matrix(security_protocol=['PLAINTEXT', 'SSL'])
- def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
- """
- Tests if ConsumerGroupCommand is listing correct consumer groups
- :return: None
- """
- self.setup_and_verify(security_protocol)
-
- @matrix(security_protocol=['PLAINTEXT', 'SSL'])
- def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
- """
- Tests if ConsumerGroupCommand is describing a consumer group correctly
- :return: None
- """
- self.setup_and_verify(security_protocol, group="test-consumer-group")
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/get_offset_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py
deleted file mode 100644
index 38bd9dc..0000000
--- a/tests/kafkatest/tests/core/get_offset_shell_test.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from kafkatest.services.verifiable_producer import VerifiableProducer
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-
-TOPIC = "topic-get-offset-shell"
-MAX_MESSAGES = 100
-NUM_PARTITIONS = 1
-REPLICATION_FACTOR = 1
-
-class GetOffsetShellTest(Test):
- """
- Tests GetOffsetShell tool
- """
- def __init__(self, test_context):
- super(GetOffsetShellTest, self).__init__(test_context)
- self.num_zk = 1
- self.num_brokers = 1
- self.messages_received_count = 0
- self.topics = {
- TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
- }
-
- self.zk = ZookeeperService(test_context, self.num_zk)
-
-
-
- def setUp(self):
- self.zk.start()
-
- def start_kafka(self, security_protocol, interbroker_security_protocol):
- self.kafka = KafkaService(
- self.test_context, self.num_brokers,
- self.zk, security_protocol=security_protocol,
- interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
- self.kafka.start()
-
- def start_producer(self):
- # This will produce to kafka cluster
- self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
- self.producer.start()
- current_acked = self.producer.num_acked
- wait_until(lambda: self.producer.num_acked >= current_acked + MAX_MESSAGES, timeout_sec=10,
- err_msg="Timeout awaiting messages to be produced and acked")
-
- def start_consumer(self, security_protocol):
- enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
- self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
- consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
- self.consumer.start()
-
- def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
- """
- Tests if GetOffsetShell is getting offsets correctly
- :return: None
- """
- self.start_kafka(security_protocol, security_protocol)
- self.start_producer()
-
- # Assert that offset fetched without any consumers consuming is 0
- assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0)
-
- self.start_consumer(security_protocol)
-
- node = self.consumer.nodes[0]
-
- wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
-
- # Assert that offset is correctly indicated by GetOffsetShell tool
- wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), timeout_sec=10,
- err_msg="Timed out waiting to reach expected offset.")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/mirror_maker_test.py b/tests/kafkatest/tests/core/mirror_maker_test.py
deleted file mode 100644
index afb1972..0000000
--- a/tests/kafkatest/tests/core/mirror_maker_test.py
+++ /dev/null
@@ -1,179 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize, matrix, ignore
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.mirror_maker import MirrorMaker
-from kafkatest.services.security.minikdc import MiniKdc
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-
-import time
-
-
-class TestMirrorMakerService(ProduceConsumeValidateTest):
- """Sanity checks on mirror maker service class."""
- def __init__(self, test_context):
- super(TestMirrorMakerService, self).__init__(test_context)
-
- self.topic = "topic"
- self.source_zk = ZookeeperService(test_context, num_nodes=1)
- self.target_zk = ZookeeperService(test_context, num_nodes=1)
-
- self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
- topics={self.topic: {"partitions": 1, "replication-factor": 1}})
- self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
- topics={self.topic: {"partitions": 1, "replication-factor": 1}})
- # This will produce to source kafka cluster
- self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
- throughput=1000)
- self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
- whitelist=self.topic, offset_commit_interval_ms=1000)
- # This will consume from target kafka cluster
- self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
- message_validator=is_int, consumer_timeout_ms=60000)
-
- def setUp(self):
- # Source cluster
- self.source_zk.start()
-
- # Target cluster
- self.target_zk.start()
-
- def start_kafka(self, security_protocol):
- self.source_kafka.security_protocol = security_protocol
- self.source_kafka.interbroker_security_protocol = security_protocol
- self.target_kafka.security_protocol = security_protocol
- self.target_kafka.interbroker_security_protocol = security_protocol
- if self.source_kafka.security_config.has_sasl_kerberos:
- minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes)
- self.source_kafka.minikdc = minikdc
- self.target_kafka.minikdc = minikdc
- minikdc.start()
- self.source_kafka.start()
- self.target_kafka.start()
-
- def bounce(self, clean_shutdown=True):
- """Bounce mirror maker with a clean (kill -15) or hard (kill -9) shutdown"""
-
- # Wait until messages start appearing in the target cluster
- wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=15)
-
- # Wait for at least one offset to be committed.
- #
- # This step is necessary to prevent data loss with default mirror maker settings:
- # currently, if we don't have at least one committed offset,
- # and we bounce mirror maker, the consumer internals will throw OffsetOutOfRangeException, and the default
- # auto.offset.reset policy ("largest") will kick in, causing mirrormaker to start consuming from the largest
- # offset. As a result, any messages produced to the source cluster while mirrormaker was dead won't get
- # mirrored to the target cluster.
- # (see https://issues.apache.org/jira/browse/KAFKA-2759)
- #
- # This isn't necessary with kill -15 because mirror maker commits its offsets during graceful
- # shutdown.
- if not clean_shutdown:
- time.sleep(self.mirror_maker.offset_commit_interval_ms / 1000.0 + .5)
-
- for i in range(3):
- self.logger.info("Bringing mirror maker nodes down...")
- for node in self.mirror_maker.nodes:
- self.mirror_maker.stop_node(node, clean_shutdown=clean_shutdown)
-
- num_consumed = len(self.consumer.messages_consumed[1])
- self.logger.info("Bringing mirror maker nodes back up...")
- for node in self.mirror_maker.nodes:
- self.mirror_maker.start_node(node)
-
- # Ensure new messages are once again showing up on the target cluster
- # new consumer requires higher timeout here
- wait_until(lambda: len(self.consumer.messages_consumed[1]) > num_consumed + 100, timeout_sec=60)
-
- def wait_for_n_messages(self, n_messages=100):
- """Wait for a minimum number of messages to be successfully produced."""
- wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
- err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
-
- @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
- @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
- def test_simple_end_to_end(self, security_protocol, new_consumer):
- """
- Test end-to-end behavior under non-failure conditions.
-
- Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
- One is source, and the other is target. Single-node mirror maker mirrors from source to target.
-
- - Start mirror maker.
- - Produce a small number of messages to the source cluster.
- - Consume messages from target.
- - Verify that number of consumed messages matches the number produced.
- """
- self.start_kafka(security_protocol)
- self.consumer.new_consumer = new_consumer
-
- self.mirror_maker.new_consumer = new_consumer
- self.mirror_maker.start()
-
- mm_node = self.mirror_maker.nodes[0]
- with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
- if new_consumer:
- monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
- else:
- monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-
- self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages)
- self.mirror_maker.stop()
-
- @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
- @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
- def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
- """
- Test end-to-end behavior under failure conditions.
-
- Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
- One is source, and the other is target. Single-node mirror maker mirrors from source to target.
-
- - Start mirror maker.
- - Produce to source cluster, and consume from target cluster in the background.
- - Bounce MM process
- - Verify every message acknowledged by the source producer is consumed by the target consumer
- """
- if new_consumer and not clean_shutdown:
- # Increase timeout on downstream console consumer; mirror maker with new consumer takes extra time
- # during hard bounce. This is because the restarted mirror maker consumer won't be able to rejoin
- # the group until the previous session times out
- self.consumer.consumer_timeout_ms = 60000
-
- self.start_kafka(security_protocol)
- self.consumer.new_consumer = new_consumer
-
- self.mirror_maker.offsets_storage = offsets_storage
- self.mirror_maker.new_consumer = new_consumer
- self.mirror_maker.start()
-
- # Wait until mirror maker has reset fetch offset at least once before continuing with the rest of the test
- mm_node = self.mirror_maker.nodes[0]
- with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
- if new_consumer:
- monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
- else:
- monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-
- self.run_produce_consume_validate(core_test_action=lambda: self.bounce(clean_shutdown=clean_shutdown))
- self.mirror_maker.stop()
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py
deleted file mode 100644
index 850e2aa..0000000
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ /dev/null
@@ -1,110 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-import random
-
-class ReassignPartitionsTest(ProduceConsumeValidateTest):
- """
- These tests validate partition reassignment.
- Create a topic with few partitions, load some data, trigger partition re-assignment with and without broker failure,
- check that partition re-assignment can complete and there is no data loss.
- """
-
- def __init__(self, test_context):
- """:type test_context: ducktape.tests.test.TestContext"""
- super(ReassignPartitionsTest, self).__init__(test_context=test_context)
-
- self.topic = "test_topic"
- self.zk = ZookeeperService(test_context, num_nodes=1)
- self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, topics={self.topic: {
- "partitions": 20,
- "replication-factor": 3,
- 'configs': {"min.insync.replicas": 2}}
- })
- self.num_partitions = 20
- self.timeout_sec = 60
- self.producer_throughput = 1000
- self.num_producers = 1
- self.num_consumers = 1
-
- def setUp(self):
- self.zk.start()
-
- def min_cluster_size(self):
- # Override this since we're adding services outside of the constructor
- return super(ReassignPartitionsTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
- def clean_bounce_some_brokers(self):
- """Bounce every other broker"""
- for node in self.kafka.nodes[::2]:
- self.kafka.restart_node(node, clean_shutdown=True)
-
- def reassign_partitions(self, bounce_brokers):
- partition_info = self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
- self.logger.debug("Partitions before reassignment:" + str(partition_info))
-
- # jumble partition assignment in dictionary
- seed = random.randint(0, 2 ** 31 - 1)
- self.logger.debug("Jumble partition assignment with seed " + str(seed))
- random.seed(seed)
- # The list may still be in order, but that's ok
- shuffled_list = range(0, self.num_partitions)
- random.shuffle(shuffled_list)
-
- for i in range(0, self.num_partitions):
- partition_info["partitions"][i]["partition"] = shuffled_list[i]
- self.logger.debug("Jumbled partitions: " + str(partition_info))
-
- # send reassign partitions command
- self.kafka.execute_reassign_partitions(partition_info)
-
- if bounce_brokers:
- # bounce a few brokers at the same time
- self.clean_bounce_some_brokers()
-
- # Wait until finished or timeout
- wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5)
-
- @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
- @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
- def test_reassign_partitions(self, bounce_brokers, security_protocol):
- """Reassign partitions tests.
- Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
-
- - Produce messages in the background
- - Consume messages in the background
- - Reassign partitions
- - If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress
- - When done reassigning partitions and bouncing brokers, stop producing, and finish consuming
- - Validate that every acked message was consumed
- """
-
- self.kafka.security_protocol = security_protocol
- self.kafka.interbroker_security_protocol = security_protocol
- new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
- self.kafka.start()
-
- self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
deleted file mode 100644
index f815034..0000000
--- a/tests/kafkatest/tests/core/replication_test.py
+++ /dev/null
@@ -1,154 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-
-import signal
-
-def broker_node(test, broker_type):
- """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
- """
- if broker_type == "leader":
- node = test.kafka.leader(test.topic, partition=0)
- elif broker_type == "controller":
- node = test.kafka.controller()
- else:
- raise Exception("Unexpected broker type %s." % (broker_type))
-
- return node
-
-def clean_shutdown(test, broker_type):
- """Discover broker node of requested type and shut it down cleanly.
- """
- node = broker_node(test, broker_type)
- test.kafka.signal_node(node, sig=signal.SIGTERM)
-
-
-def hard_shutdown(test, broker_type):
- """Discover broker node of requested type and shut it down with a hard kill."""
- node = broker_node(test, broker_type)
- test.kafka.signal_node(node, sig=signal.SIGKILL)
-
-
-def clean_bounce(test, broker_type):
- """Chase the leader of one partition and restart it cleanly."""
- for i in range(5):
- prev_broker_node = broker_node(test, broker_type)
- test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
-
-
-def hard_bounce(test, broker_type):
- """Chase the leader and restart it with a hard kill."""
- for i in range(5):
- prev_broker_node = broker_node(test, broker_type)
- test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
-
- # Since this is a hard kill, we need to make sure the process is down and that
- # zookeeper has registered the loss by expiring the broker's session timeout.
-
- wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
- timeout_sec=test.kafka.zk_session_timeout + 5,
- err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
-
- test.kafka.start_node(prev_broker_node)
-
-failures = {
- "clean_shutdown": clean_shutdown,
- "hard_shutdown": hard_shutdown,
- "clean_bounce": clean_bounce,
- "hard_bounce": hard_bounce
-}
-
-
-class ReplicationTest(ProduceConsumeValidateTest):
- """
- Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
- (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
- too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
- ordering guarantees.
-
- Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
- we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
-
- Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
- consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
- indicator that nothing is left to consume.
- """
-
- def __init__(self, test_context):
- """:type test_context: ducktape.tests.test.TestContext"""
- super(ReplicationTest, self).__init__(test_context=test_context)
-
- self.topic = "test_topic"
- self.zk = ZookeeperService(test_context, num_nodes=1)
- self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
- "partitions": 3,
- "replication-factor": 3,
- 'configs': {"min.insync.replicas": 2}}
- })
- self.producer_throughput = 1000
- self.num_producers = 1
- self.num_consumers = 1
-
- def setUp(self):
- self.zk.start()
-
- def min_cluster_size(self):
- """Override this since we're adding services outside of the constructor"""
- return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-
- @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
- broker_type=["leader"],
- security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
- @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
- broker_type=["controller"],
- security_protocol=["PLAINTEXT", "SASL_SSL"])
- @matrix(failure_mode=["hard_bounce"],
- broker_type=["leader"],
- security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"])
- def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"):
- """Replication tests.
- These tests verify that replication provides simple durability guarantees by checking that data acked by
- brokers is still available for consumption in the face of various failure scenarios.
-
- Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
-
- - Produce messages in the background
- - Consume messages in the background
- - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
- - When done driving failures, stop producing, and finish consuming
- - Validate that every acked message was consumed
- """
-
- self.kafka.security_protocol = security_protocol
- self.kafka.interbroker_security_protocol = security_protocol
- self.kafka.client_sasl_mechanism = client_sasl_mechanism
- self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
- new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
- self.kafka.start()
-
- self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
deleted file mode 100644
index 51b2e60..0000000
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ /dev/null
@@ -1,190 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from ducktape.mark import parametrize
-from ducktape.mark import matrix
-from kafkatest.services.security.kafka_acls import ACLs
-import time
-
-
-class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
- """Tests a rolling upgrade from PLAINTEXT to a secured cluster
- """
-
- def __init__(self, test_context):
- super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context)
-
- def setUp(self):
- self.acls = ACLs(self.test_context)
- self.topic = "test_topic"
- self.group = "group"
- self.producer_throughput = 100
- self.num_producers = 1
- self.num_consumers = 1
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
- "partitions": 3,
- "replication-factor": 3,
- 'configs': {"min.insync.replicas": 2}}})
- self.zk.start()
-
- def create_producer_and_consumer(self):
- self.producer = VerifiableProducer(
- self.test_context, self.num_producers, self.kafka, self.topic,
- throughput=self.producer_throughput)
-
- self.consumer = ConsoleConsumer(
- self.test_context, self.num_consumers, self.kafka, self.topic,
- consumer_timeout_ms=60000, message_validator=is_int)
-
- self.consumer.group_id = "group"
-
- def bounce(self):
- self.kafka.start_minikdc()
- for node in self.kafka.nodes:
- self.kafka.stop_node(node)
- self.kafka.start_node(node)
- time.sleep(10)
-
- def roll_in_secured_settings(self, client_protocol, broker_protocol):
-
- # Roll cluster to include inter broker security protocol.
- self.kafka.interbroker_security_protocol = broker_protocol
- self.kafka.open_port(client_protocol)
- self.kafka.open_port(broker_protocol)
- self.bounce()
-
- # Roll cluster to disable PLAINTEXT port
- self.kafka.close_port('PLAINTEXT')
- self.set_authorizer_and_bounce(client_protocol, broker_protocol)
-
- def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
- self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
- self.acls.set_acls(client_protocol, self.kafka, self.zk, self.topic, self.group)
- self.acls.set_acls(broker_protocol, self.kafka, self.zk, self.topic, self.group)
- self.bounce()
-
- def open_secured_port(self, client_protocol):
- self.kafka.security_protocol = client_protocol
- self.kafka.open_port(client_protocol)
- self.kafka.start_minikdc()
- self.bounce()
-
- def add_sasl_mechanism(self, new_client_sasl_mechanism):
- self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
- self.kafka.start_minikdc()
- self.bounce()
-
- def roll_in_sasl_mechanism(self, security_protocol, new_sasl_mechanism):
- # Roll cluster to update inter-broker SASL mechanism. This disables the old mechanism.
- self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
- self.bounce()
-
- # Bounce again with ACLs for new mechanism
- self.set_authorizer_and_bounce(security_protocol, security_protocol)
-
- @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
- def test_rolling_upgrade_phase_one(self, client_protocol):
- """
- Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
- and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
- """
- self.kafka.interbroker_security_protocol = "PLAINTEXT"
- self.kafka.security_protocol = "PLAINTEXT"
- self.kafka.start()
-
- # Create PLAINTEXT producer and consumer
- self.create_producer_and_consumer()
-
- # Rolling upgrade, opening a secure protocol, ensuring the Plaintext producer/consumer continues to run
- self.run_produce_consume_validate(self.open_secured_port, client_protocol)
-
- # Now we can produce and consume via the secured port
- self.kafka.security_protocol = client_protocol
- self.create_producer_and_consumer()
- self.run_produce_consume_validate(lambda: time.sleep(1))
-
- @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
- def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
- """
- Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
- Start an Producer and Consumer via the SECURED port
- Incrementally upgrade to add inter-broker be the secure protocol
- Incrementally upgrade again to add ACLs as well as disabling the PLAINTEXT port
- Ensure the producer and consumer ran throughout
- """
- #Given we have a broker that has both secure and PLAINTEXT ports open
- self.kafka.security_protocol = client_protocol
- self.kafka.interbroker_security_protocol = "PLAINTEXT"
- self.kafka.start()
-
- #Create Secured Producer and Consumer
- self.create_producer_and_consumer()
-
- #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
- self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
-
- @parametrize(new_client_sasl_mechanism='PLAIN')
- def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
- """
- Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce
- and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism.
- """
- self.kafka.interbroker_security_protocol = "SASL_SSL"
- self.kafka.security_protocol = "SASL_SSL"
- self.kafka.client_sasl_mechanism = "GSSAPI"
- self.kafka.interbroker_sasl_mechanism = "GSSAPI"
- self.kafka.start()
-
- # Create SASL/GSSAPI producer and consumer
- self.create_producer_and_consumer()
-
- # Rolling upgrade, adding new SASL mechanism, ensuring the GSSAPI producer/consumer continues to run
- self.run_produce_consume_validate(self.add_sasl_mechanism, new_client_sasl_mechanism)
-
- # Now we can produce and consume using the new SASL mechanism
- self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
- self.create_producer_and_consumer()
- self.run_produce_consume_validate(lambda: time.sleep(1))
-
- @parametrize(new_sasl_mechanism='PLAIN')
- def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
- """
- Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one).
- Start Producer and Consumer using the second mechanism
- Incrementally upgrade to set inter-broker to the second mechanism and disable GSSAPI
- Incrementally upgrade again to add ACLs
- Ensure the producer and consumer run throughout
- """
- #Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients
- self.kafka.security_protocol = "SASL_SSL"
- self.kafka.interbroker_security_protocol = "SASL_SSL"
- self.kafka.client_sasl_mechanism = new_sasl_mechanism
- self.kafka.interbroker_sasl_mechanism = "GSSAPI"
- self.kafka.start()
-
- #Create Producer and Consumer using second mechanism
- self.create_producer_and_consumer()
-
- #Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout
- self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism)
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/security_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
deleted file mode 100644
index b6bc656..0000000
--- a/tests/kafkatest/tests/core/security_test.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.security.security_config import SslStores
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-import time
-
-class TestSslStores(SslStores):
- def __init__(self):
- super(TestSslStores, self).__init__()
- self.invalid_hostname = False
- self.generate_ca()
- self.generate_truststore()
-
- def hostname(self, node):
- if (self.invalid_hostname):
- return "invalidhost"
- else:
- return super(TestSslStores, self).hostname(node)
-
-class SecurityTest(ProduceConsumeValidateTest):
- """
- These tests validate security features.
- """
-
- def __init__(self, test_context):
- """:type test_context: ducktape.tests.test.TestContext"""
- super(SecurityTest, self).__init__(test_context=test_context)
-
- self.topic = "test_topic"
- self.zk = ZookeeperService(test_context, num_nodes=1)
- self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
- "partitions": 2,
- "replication-factor": 1}
- })
- self.num_partitions = 2
- self.timeout_sec = 10000
- self.producer_throughput = 1000
- self.num_producers = 1
- self.num_consumers = 1
-
- def setUp(self):
- self.zk.start()
-
- @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
- @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
- def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
- """
- Test that invalid hostname in certificate results in connection failures.
- When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
- When security_protocol=PLAINTEXT and interbroker_security_protocol=SSL, controller connections fail
- with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
- """
-
- self.kafka.security_protocol = security_protocol
- self.kafka.interbroker_security_protocol = interbroker_security_protocol
- SecurityConfig.ssl_stores = TestSslStores()
-
- SecurityConfig.ssl_stores.invalid_hostname = True
- self.kafka.start()
- self.create_producer_and_consumer()
- self.producer.log_level = "TRACE"
- self.producer.start()
- self.consumer.start()
- time.sleep(10)
- assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname"
- error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE'
- for node in self.producer.nodes:
- node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
- for node in self.consumer.nodes:
- node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
-
- self.producer.stop()
- self.consumer.stop()
- self.producer.log_level = "INFO"
-
- SecurityConfig.ssl_stores.invalid_hostname = False
- for node in self.kafka.nodes:
- self.kafka.restart_node(node, clean_shutdown=True)
- self.create_producer_and_consumer()
- self.run_produce_consume_validate()
-
- def create_producer_and_consumer(self):
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=10000, message_validator=is_int)
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/simple_consumer_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/simple_consumer_shell_test.py b/tests/kafkatest/tests/core/simple_consumer_shell_test.py
deleted file mode 100644
index 74a7eeb..0000000
--- a/tests/kafkatest/tests/core/simple_consumer_shell_test.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from kafkatest.services.simple_consumer_shell import SimpleConsumerShell
-from kafkatest.services.verifiable_producer import VerifiableProducer
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-TOPIC = "topic-simple-consumer-shell"
-MAX_MESSAGES = 100
-NUM_PARTITIONS = 1
-REPLICATION_FACTOR = 1
-
-class SimpleConsumerShellTest(Test):
- """
- Tests SimpleConsumerShell tool
- """
- def __init__(self, test_context):
- super(SimpleConsumerShellTest, self).__init__(test_context)
- self.num_zk = 1
- self.num_brokers = 1
- self.messages_received_count = 0
- self.topics = {
- TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
- }
-
- self.zk = ZookeeperService(test_context, self.num_zk)
-
- def setUp(self):
- self.zk.start()
-
- def start_kafka(self):
- self.kafka = KafkaService(
- self.test_context, self.num_brokers,
- self.zk, topics=self.topics)
- self.kafka.start()
-
- def run_producer(self):
- # This will produce to kafka cluster
- self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
- self.producer.start()
- wait_until(lambda: self.producer.num_acked == MAX_MESSAGES, timeout_sec=10,
- err_msg="Timeout awaiting messages to be produced and acked")
-
- def start_simple_consumer_shell(self):
- self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC)
- self.simple_consumer_shell.start()
-
- def test_simple_consumer_shell(self):
- """
- Tests if SimpleConsumerShell is fetching expected records
- :return: None
- """
- self.start_kafka()
- self.run_producer()
- self.start_simple_consumer_shell()
-
- # Assert that SimpleConsumerShell is fetching expected number of messages
- wait_until(lambda: self.simple_consumer_shell.get_output().count("\n") == (MAX_MESSAGES + 1), timeout_sec=10,
- err_msg="Timed out waiting to receive expected number of messages.")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/core/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
deleted file mode 100644
index 2e21322..0000000
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-import math
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.utils import is_int
-
-
-class ThrottlingTest(ProduceConsumeValidateTest):
- """Tests throttled partition reassignment. This is essentially similar
- to the reassign_partitions_test, except that we throttle the reassignment
- and verify that it takes a sensible amount of time given the throttle
- and the amount of data being moved.
-
- Since the correctness is time dependent, this test also simplifies the
- cluster topology. In particular, we fix the number of brokers, the
- replication-factor, the number of partitions, the partition size, and
- the number of partitions being moved so that we can accurately predict
- the time throttled reassignment should take.
- """
-
- def __init__(self, test_context):
- """:type test_context: ducktape.tests.test.TestContext"""
- super(ThrottlingTest, self).__init__(test_context=test_context)
-
- self.topic = "test_topic"
- self.zk = ZookeeperService(test_context, num_nodes=1)
- # Because we are starting the producer/consumer/validate cycle _after_
- # seeding the cluster with big data (to test throttling), we need to
- # Start the consumer from the end of the stream. further, we need to
- # ensure that the consumer is fully started before the producer starts
- # so that we don't miss any messages. This timeout ensures the sufficient
- # condition.
- self.consumer_init_timeout_sec = 10
- self.num_brokers = 6
- self.num_partitions = 3
- self.kafka = KafkaService(test_context,
- num_nodes=self.num_brokers,
- zk=self.zk,
- topics={
- self.topic: {
- "partitions": self.num_partitions,
- "replication-factor": 2,
- "configs": {
- "segment.bytes": 64 * 1024 * 1024
- }
- }
- })
- self.producer_throughput = 1000
- self.timeout_sec = 400
- self.num_records = 2000
- self.record_size = 4096 * 100 # 400 KB
- # 1 MB per partition on average.
- self.partition_size = (self.num_records * self.record_size) / self.num_partitions
- self.num_producers = 2
- self.num_consumers = 1
- self.throttle = 4 * 1024 * 1024 # 4 MB/s
-
- def setUp(self):
- self.zk.start()
-
- def min_cluster_size(self):
- # Override this since we're adding services outside of the constructor
- return super(ThrottlingTest, self).min_cluster_size() +\
- self.num_producers + self.num_consumers
-
- def clean_bounce_some_brokers(self):
- """Bounce every other broker"""
- for node in self.kafka.nodes[::2]:
- self.kafka.restart_node(node, clean_shutdown=True)
-
- def reassign_partitions(self, bounce_brokers, throttle):
- """This method reassigns partitions using a throttle. It makes an
- assertion about the minimum amount of time the reassignment should take
- given the value of the throttle, the number of partitions being moved,
- and the size of each partition.
- """
- partition_info = self.kafka.parse_describe_topic(
- self.kafka.describe_topic(self.topic))
- self.logger.debug("Partitions before reassignment:" +
- str(partition_info))
- max_num_moves = 0
- for i in range(0, self.num_partitions):
- old_replicas = set(partition_info["partitions"][i]["replicas"])
- new_part = (i+1) % self.num_partitions
- new_replicas = set(partition_info["partitions"][new_part]["replicas"])
- max_num_moves = max(len(new_replicas - old_replicas), max_num_moves)
- partition_info["partitions"][i]["partition"] = new_part
- self.logger.debug("Jumbled partitions: " + str(partition_info))
-
- self.kafka.execute_reassign_partitions(partition_info,
- throttle=throttle)
- start = time.time()
- if bounce_brokers:
- # bounce a few brokers at the same time
- self.clean_bounce_some_brokers()
-
- # Wait until finished or timeout
- size_per_broker = max_num_moves * self.partition_size
- self.logger.debug("Max amount of data transfer per broker: %fb",
- size_per_broker)
- estimated_throttled_time = math.ceil(float(size_per_broker) /
- self.throttle)
- estimated_time_with_buffer = estimated_throttled_time * 2
- self.logger.debug("Waiting %ds for the reassignment to complete",
- estimated_time_with_buffer)
- wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info),
- timeout_sec=estimated_time_with_buffer, backoff_sec=.5)
- stop = time.time()
- time_taken = stop - start
- self.logger.debug("Transfer took %d second. Estimated time : %ds",
- time_taken,
- estimated_throttled_time)
- assert time_taken >= estimated_throttled_time, \
- ("Expected rebalance to take at least %ds, but it took %ds" % (
- estimated_throttled_time,
- time_taken))
-
- @parametrize(bounce_brokers=False)
- @parametrize(bounce_brokers=True)
- def test_throttled_reassignment(self, bounce_brokers):
- security_protocol = 'PLAINTEXT'
- self.kafka.security_protocol = security_protocol
- self.kafka.interbroker_security_protocol = security_protocol
-
- producer_id = 'bulk_producer'
- bulk_producer = ProducerPerformanceService(
- context=self.test_context, num_nodes=1, kafka=self.kafka,
- topic=self.topic, num_records=self.num_records,
- record_size=self.record_size, throughput=-1, client_id=producer_id,
- jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
- jmx_attributes=['outgoing-byte-rate'])
-
-
- self.producer = VerifiableProducer(context=self.test_context,
- num_nodes=1,
- kafka=self.kafka, topic=self.topic,
- message_validator=is_int,
- throughput=self.producer_throughput)
-
- self.consumer = ConsoleConsumer(self.test_context,
- self.num_consumers,
- self.kafka,
- self.topic,
- consumer_timeout_ms=60000,
- message_validator=is_int,
- from_beginning=False)
-
- self.kafka.start()
- bulk_producer.run()
- self.run_produce_consume_validate(core_test_action=
- lambda: self.reassign_partitions(bounce_brokers, self.throttle))