You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/04 05:04:52 UTC
[2/4] kafka git commit: KAFKA-3483: Restructure ducktape tests to
simplify running subsets of tests
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
new file mode 100644
index 0000000..2c261df
--- /dev/null
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -0,0 +1,78 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.kafka import config_property
+
+# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
+class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
+
+ def __init__(self, test_context):
+ super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+
+ self.zk.start()
+
+ # Producer and consumer
+ self.producer_throughput = 10000
+ self.num_producers = 1
+ self.num_consumers = 1
+ self.messages_per_producer = 1000
+
+ @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], timestamp_type=None)
+ @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=None)
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=None)
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("CreateTime"))
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=str("LogAppendTime"))
+ def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False, timestamp_type=None):
+
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
+ "partitions": 3,
+ "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}})
+ for node in self.kafka.nodes:
+ if timestamp_type is not None:
+ node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
+ self.kafka.start()
+
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic, throughput=self.producer_throughput,
+ message_validator=is_int,
+ compression_types=compression_types,
+ version=KafkaVersion(producer_version))
+
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+ self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
+ message_validator=is_int, version=KafkaVersion(consumer_version))
+
+ self.run_produce_consume_validate(lambda: wait_until(
+ lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+ timeout_sec=120, backoff_sec=1,
+ err_msg="Producer did not produce all messages in reasonable amount of time"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/consumer_group_command_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py
new file mode 100644
index 0000000..1424d96
--- /dev/null
+++ b/tests/kafkatest/tests/core/consumer_group_command_test.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+
+import os
+import re
+
+TOPIC = "topic-consumer-group-command"
+
+class ConsumerGroupCommandTest(Test):
+ """
+ Tests ConsumerGroupCommand
+ """
+ # Root directory for persistent output
+ PERSISTENT_ROOT = "/mnt/consumer_group_command"
+ COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
+
+ def __init__(self, test_context):
+ super(ConsumerGroupCommandTest, self).__init__(test_context)
+ self.num_zk = 1
+ self.num_brokers = 1
+ self.topics = {
+ TOPIC: {'partitions': 1, 'replication-factor': 1}
+ }
+ self.zk = ZookeeperService(test_context, self.num_zk)
+
+ def setUp(self):
+ self.zk.start()
+
+ def start_kafka(self, security_protocol, interbroker_security_protocol):
+ self.kafka = KafkaService(
+ self.test_context, self.num_brokers,
+ self.zk, security_protocol=security_protocol,
+ interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+ self.kafka.start()
+
+ def start_consumer(self, security_protocol):
+ enable_new_consumer = security_protocol == SecurityConfig.SSL
+ self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
+ consumer_timeout_ms=None, new_consumer=enable_new_consumer)
+ self.consumer.start()
+
+ def setup_and_verify(self, security_protocol, group=None):
+ self.start_kafka(security_protocol, security_protocol)
+ self.start_consumer(security_protocol)
+ consumer_node = self.consumer.nodes[0]
+ wait_until(lambda: self.consumer.alive(consumer_node),
+ timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
+ kafka_node = self.kafka.nodes[0]
+ if security_protocol is not SecurityConfig.PLAINTEXT:
+ prop_file = str(self.kafka.security_config.client_config())
+ self.logger.debug(prop_file)
+ kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+ kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)
+
+ # Verify ConsumerGroupCommand lists expected consumer groups
+ enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
+ command_config_file = None
+ if enable_new_consumer:
+ command_config_file = self.COMMAND_CONFIG_FILE
+
+ if group:
+ wait_until(lambda: re.search("%s\s+topic-consumer-group-command\s+0"%group,self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file)), timeout_sec=10,
+ err_msg="Timed out waiting to list expected consumer groups.")
+ else:
+ wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
+ err_msg="Timed out waiting to list expected consumer groups.")
+
+ self.consumer.stop()
+
+ @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+ def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
+ """
+ Tests if ConsumerGroupCommand is listing correct consumer groups
+ :return: None
+ """
+ self.setup_and_verify(security_protocol)
+
+ @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+ def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
+ """
+ Tests if ConsumerGroupCommand is describing a consumer group correctly
+ :return: None
+ """
+ self.setup_and_verify(security_protocol, group="test-consumer-group")
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/get_offset_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py
new file mode 100644
index 0000000..38bd9dc
--- /dev/null
+++ b/tests/kafkatest/tests/core/get_offset_shell_test.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+
+TOPIC = "topic-get-offset-shell"
+MAX_MESSAGES = 100
+NUM_PARTITIONS = 1
+REPLICATION_FACTOR = 1
+
+class GetOffsetShellTest(Test):
+ """
+ Tests GetOffsetShell tool
+ """
+ def __init__(self, test_context):
+ super(GetOffsetShellTest, self).__init__(test_context)
+ self.num_zk = 1
+ self.num_brokers = 1
+ self.messages_received_count = 0
+ self.topics = {
+ TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
+ }
+
+ self.zk = ZookeeperService(test_context, self.num_zk)
+
+
+
+ def setUp(self):
+ self.zk.start()
+
+ def start_kafka(self, security_protocol, interbroker_security_protocol):
+ self.kafka = KafkaService(
+ self.test_context, self.num_brokers,
+ self.zk, security_protocol=security_protocol,
+ interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+ self.kafka.start()
+
+ def start_producer(self):
+ # This will produce to kafka cluster
+ self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
+ self.producer.start()
+ current_acked = self.producer.num_acked
+ wait_until(lambda: self.producer.num_acked >= current_acked + MAX_MESSAGES, timeout_sec=10,
+ err_msg="Timeout awaiting messages to be produced and acked")
+
+ def start_consumer(self, security_protocol):
+ enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
+ self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
+ consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
+ self.consumer.start()
+
+ def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
+ """
+ Tests if GetOffsetShell is getting offsets correctly
+ :return: None
+ """
+ self.start_kafka(security_protocol, security_protocol)
+ self.start_producer()
+
+ # Assert that offset fetched without any consumers consuming is 0
+ assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0)
+
+ self.start_consumer(security_protocol)
+
+ node = self.consumer.nodes[0]
+
+ wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
+
+ # Assert that offset is correctly indicated by GetOffsetShell tool
+ wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), timeout_sec=10,
+ err_msg="Timed out waiting to reach expected offset.")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/mirror_maker_test.py b/tests/kafkatest/tests/core/mirror_maker_test.py
new file mode 100644
index 0000000..afb1972
--- /dev/null
+++ b/tests/kafkatest/tests/core/mirror_maker_test.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize, matrix, ignore
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.mirror_maker import MirrorMaker
+from kafkatest.services.security.minikdc import MiniKdc
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+
+import time
+
+
+class TestMirrorMakerService(ProduceConsumeValidateTest):
+ """Sanity checks on mirror maker service class."""
+ def __init__(self, test_context):
+ super(TestMirrorMakerService, self).__init__(test_context)
+
+ self.topic = "topic"
+ self.source_zk = ZookeeperService(test_context, num_nodes=1)
+ self.target_zk = ZookeeperService(test_context, num_nodes=1)
+
+ self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
+ topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+ self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
+ topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+ # This will produce to source kafka cluster
+ self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
+ throughput=1000)
+ self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
+ whitelist=self.topic, offset_commit_interval_ms=1000)
+ # This will consume from target kafka cluster
+ self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
+ message_validator=is_int, consumer_timeout_ms=60000)
+
+ def setUp(self):
+ # Source cluster
+ self.source_zk.start()
+
+ # Target cluster
+ self.target_zk.start()
+
+ def start_kafka(self, security_protocol):
+ self.source_kafka.security_protocol = security_protocol
+ self.source_kafka.interbroker_security_protocol = security_protocol
+ self.target_kafka.security_protocol = security_protocol
+ self.target_kafka.interbroker_security_protocol = security_protocol
+ if self.source_kafka.security_config.has_sasl_kerberos:
+ minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes)
+ self.source_kafka.minikdc = minikdc
+ self.target_kafka.minikdc = minikdc
+ minikdc.start()
+ self.source_kafka.start()
+ self.target_kafka.start()
+
+ def bounce(self, clean_shutdown=True):
+ """Bounce mirror maker with a clean (kill -15) or hard (kill -9) shutdown"""
+
+ # Wait until messages start appearing in the target cluster
+ wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=15)
+
+ # Wait for at least one offset to be committed.
+ #
+ # This step is necessary to prevent data loss with default mirror maker settings:
+ # currently, if we don't have at least one committed offset,
+ # and we bounce mirror maker, the consumer internals will throw OffsetOutOfRangeException, and the default
+ # auto.offset.reset policy ("largest") will kick in, causing mirrormaker to start consuming from the largest
+ # offset. As a result, any messages produced to the source cluster while mirrormaker was dead won't get
+ # mirrored to the target cluster.
+ # (see https://issues.apache.org/jira/browse/KAFKA-2759)
+ #
+ # This isn't necessary with kill -15 because mirror maker commits its offsets during graceful
+ # shutdown.
+ if not clean_shutdown:
+ time.sleep(self.mirror_maker.offset_commit_interval_ms / 1000.0 + .5)
+
+ for i in range(3):
+ self.logger.info("Bringing mirror maker nodes down...")
+ for node in self.mirror_maker.nodes:
+ self.mirror_maker.stop_node(node, clean_shutdown=clean_shutdown)
+
+ num_consumed = len(self.consumer.messages_consumed[1])
+ self.logger.info("Bringing mirror maker nodes back up...")
+ for node in self.mirror_maker.nodes:
+ self.mirror_maker.start_node(node)
+
+ # Ensure new messages are once again showing up on the target cluster
+ # new consumer requires higher timeout here
+ wait_until(lambda: len(self.consumer.messages_consumed[1]) > num_consumed + 100, timeout_sec=60)
+
+ def wait_for_n_messages(self, n_messages=100):
+ """Wait for a minimum number of messages to be successfully produced."""
+ wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
+ err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
+
+ @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+ @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
+ def test_simple_end_to_end(self, security_protocol, new_consumer):
+ """
+ Test end-to-end behavior under non-failure conditions.
+
+ Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
+ One is source, and the other is target. Single-node mirror maker mirrors from source to target.
+
+ - Start mirror maker.
+ - Produce a small number of messages to the source cluster.
+ - Consume messages from target.
+ - Verify that number of consumed messages matches the number produced.
+ """
+ self.start_kafka(security_protocol)
+ self.consumer.new_consumer = new_consumer
+
+ self.mirror_maker.new_consumer = new_consumer
+ self.mirror_maker.start()
+
+ mm_node = self.mirror_maker.nodes[0]
+ with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
+ if new_consumer:
+ monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
+ else:
+ monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
+
+ self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages)
+ self.mirror_maker.stop()
+
+ @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
+ @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+ def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
+ """
+ Test end-to-end behavior under failure conditions.
+
+ Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
+ One is source, and the other is target. Single-node mirror maker mirrors from source to target.
+
+ - Start mirror maker.
+ - Produce to source cluster, and consume from target cluster in the background.
+ - Bounce MM process
+ - Verify every message acknowledged by the source producer is consumed by the target consumer
+ """
+ if new_consumer and not clean_shutdown:
+ # Increase timeout on downstream console consumer; mirror maker with new consumer takes extra time
+ # during hard bounce. This is because the restarted mirror maker consumer won't be able to rejoin
+ # the group until the previous session times out
+ self.consumer.consumer_timeout_ms = 60000
+
+ self.start_kafka(security_protocol)
+ self.consumer.new_consumer = new_consumer
+
+ self.mirror_maker.offsets_storage = offsets_storage
+ self.mirror_maker.new_consumer = new_consumer
+ self.mirror_maker.start()
+
+ # Wait until mirror maker has reset fetch offset at least once before continuing with the rest of the test
+ mm_node = self.mirror_maker.nodes[0]
+ with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
+ if new_consumer:
+ monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
+ else:
+ monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
+
+ self.run_produce_consume_validate(core_test_action=lambda: self.bounce(clean_shutdown=clean_shutdown))
+ self.mirror_maker.stop()
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py
new file mode 100644
index 0000000..24ce097
--- /dev/null
+++ b/tests/kafkatest/tests/core/reassign_partitions_test.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class ReassignPartitionsTest(ProduceConsumeValidateTest):
+ """
+ These tests validate partition reassignment.
+ Create a topic with few partitions, load some data, trigger partition re-assignment with and without broker failure,
+ check that partition re-assignment can complete and there is no data loss.
+ """
+
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(ReassignPartitionsTest, self).__init__(test_context=test_context)
+
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+ "partitions": 20,
+ "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}
+ })
+ self.num_partitions = 20
+ self.timeout_sec = 60
+ self.producer_throughput = 1000
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ def setUp(self):
+ self.zk.start()
+
+ def min_cluster_size(self):
+ # Override this since we're adding services outside of the constructor
+ return super(ReassignPartitionsTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+ def clean_bounce_some_brokers(self):
+ """Bounce every other broker"""
+ for node in self.kafka.nodes[::2]:
+ self.kafka.restart_node(node, clean_shutdown=True)
+
+ def reassign_partitions(self, bounce_brokers):
+ partition_info = self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
+ self.logger.debug("Partitions before reassignment:" + str(partition_info))
+
+ # jumble partition assignment in dictionary
+ seed = random.randint(0, 2 ** 31 - 1)
+ self.logger.debug("Jumble partition assignment with seed " + str(seed))
+ random.seed(seed)
+ # The list may still be in order, but that's ok
+ shuffled_list = range(0, self.num_partitions)
+ random.shuffle(shuffled_list)
+
+ for i in range(0, self.num_partitions):
+ partition_info["partitions"][i]["partition"] = shuffled_list[i]
+ self.logger.debug("Jumbled partitions: " + str(partition_info))
+
+ # send reassign partitions command
+ self.kafka.execute_reassign_partitions(partition_info)
+
+ if bounce_brokers:
+ # bounce a few brokers at the same time
+ self.clean_bounce_some_brokers()
+
+ # Wait until finished or timeout
+ wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5)
+
+ @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
+ @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
+ def test_reassign_partitions(self, bounce_brokers, security_protocol):
+ """Reassign partitions tests.
+ Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
+
+ - Produce messages in the background
+ - Consume messages in the background
+ - Reassign partitions
+ - If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress
+ - When done reassigning partitions and bouncing brokers, stop producing, and finish consuming
+ - Validate that every acked message was consumed
+ """
+
+ self.kafka.security_protocol = security_protocol
+ self.kafka.interbroker_security_protocol = security_protocol
+ new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
+ self.kafka.start()
+
+ self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers))
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
new file mode 100644
index 0000000..7b360ab
--- /dev/null
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+
+import signal
+
+def broker_node(test, broker_type):
+ """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
+ """
+ if broker_type == "leader":
+ node = test.kafka.leader(test.topic, partition=0)
+ elif broker_type == "controller":
+ node = test.kafka.controller()
+ else:
+ raise Exception("Unexpected broker type %s." % (broker_type))
+
+ return node
+
+def clean_shutdown(test, broker_type):
+ """Discover broker node of requested type and shut it down cleanly.
+ """
+ node = broker_node(test, broker_type)
+ test.kafka.signal_node(node, sig=signal.SIGTERM)
+
+
+def hard_shutdown(test, broker_type):
+ """Discover broker node of requested type and shut it down with a hard kill."""
+ node = broker_node(test, broker_type)
+ test.kafka.signal_node(node, sig=signal.SIGKILL)
+
+
+def clean_bounce(test, broker_type):
+ """Chase the leader of one partition and restart it cleanly."""
+ for i in range(5):
+ prev_broker_node = broker_node(test, broker_type)
+ test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
+
+
+def hard_bounce(test, broker_type):
+ """Chase the leader and restart it with a hard kill."""
+ for i in range(5):
+ prev_broker_node = broker_node(test, broker_type)
+ test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
+
+ # Since this is a hard kill, we need to make sure the process is down and that
+ # zookeeper and the broker cluster have registered the loss of the leader/controller.
+ # Waiting for a new leader for the topic-partition/controller to be elected is a reasonable heuristic for this.
+
+ def role_reassigned():
+ current_elected_broker = broker_node(test, broker_type)
+ return current_elected_broker is not None and current_elected_broker != prev_broker_node
+
+ wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, timeout_sec=5)
+ wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5)
+ test.kafka.start_node(prev_broker_node)
+
+failures = {
+ "clean_shutdown": clean_shutdown,
+ "hard_shutdown": hard_shutdown,
+ "clean_bounce": clean_bounce,
+ "hard_bounce": hard_bounce
+}
+
+
+class ReplicationTest(ProduceConsumeValidateTest):
+ """
+ Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
+ (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
+ too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
+ ordering guarantees.
+
+ Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
+ we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
+
+ Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
+ consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
+ indicator that nothing is left to consume.
+ """
+
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(ReplicationTest, self).__init__(test_context=test_context)
+
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+ "partitions": 3,
+ "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}
+ })
+ self.producer_throughput = 1000
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ def setUp(self):
+ self.zk.start()
+
+ def min_cluster_size(self):
+ """Override this since we're adding services outside of the constructor"""
+ return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+
+ @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+ broker_type=["leader"],
+ security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+ @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+ broker_type=["controller"],
+ security_protocol=["PLAINTEXT", "SASL_SSL"])
+ def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type):
+ """Replication tests.
+ These tests verify that replication provides simple durability guarantees by checking that data acked by
+ brokers is still available for consumption in the face of various failure scenarios.
+
+ Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
+
+ - Produce messages in the background
+ - Consume messages in the background
+ - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
+ - When done driving failures, stop producing, and finish consuming
+ - Validate that every acked message was consumed
+ """
+
+ self.kafka.security_protocol = security_protocol
+ self.kafka.interbroker_security_protocol = security_protocol
+ new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
+ self.kafka.start()
+
+ self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
new file mode 100644
index 0000000..fdbedca
--- /dev/null
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from ducktape.mark import matrix
+from kafkatest.services.security.kafka_acls import ACLs
+import time
+
+
+class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
+ """Tests a rolling upgrade from PLAINTEXT to a secured cluster
+ """
+
+ def __init__(self, test_context):
+ super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.acls = ACLs()
+ self.topic = "test_topic"
+ self.group = "group"
+ self.producer_throughput = 100
+ self.num_producers = 1
+ self.num_consumers = 1
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+ "partitions": 3,
+ "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}})
+ self.zk.start()
+
+ def create_producer_and_consumer(self):
+ self.producer = VerifiableProducer(
+ self.test_context, self.num_producers, self.kafka, self.topic,
+ throughput=self.producer_throughput)
+
+ self.consumer = ConsoleConsumer(
+ self.test_context, self.num_consumers, self.kafka, self.topic,
+ consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
+
+ self.consumer.group_id = "group"
+
+ def bounce(self):
+ self.kafka.start_minikdc()
+ for node in self.kafka.nodes:
+ self.kafka.stop_node(node)
+ self.kafka.start_node(node)
+ time.sleep(10)
+
+ def roll_in_secured_settings(self, client_protocol, broker_protocol):
+
+ # Roll cluster to include inter broker security protocol.
+ self.kafka.interbroker_security_protocol = broker_protocol
+ self.kafka.open_port(client_protocol)
+ self.kafka.open_port(broker_protocol)
+ self.bounce()
+
+ # Roll cluster to disable PLAINTEXT port
+ self.kafka.close_port('PLAINTEXT')
+ self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
+ self.acls.set_acls(client_protocol, self.kafka, self.zk, self.topic, self.group)
+ self.acls.set_acls(broker_protocol, self.kafka, self.zk, self.topic, self.group)
+ self.bounce()
+
+ def open_secured_port(self, client_protocol):
+ self.kafka.security_protocol = client_protocol
+ self.kafka.open_port(client_protocol)
+ self.kafka.start_minikdc()
+ self.bounce()
+
+ @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+ def test_rolling_upgrade_phase_one(self, client_protocol):
+ """
+ Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
+ and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
+ """
+ self.kafka.interbroker_security_protocol = "PLAINTEXT"
+ self.kafka.security_protocol = "PLAINTEXT"
+ self.kafka.start()
+
+ # Create PLAINTEXT producer and consumer
+ self.create_producer_and_consumer()
+
+ # Rolling upgrade, opening a secure protocol, ensuring the Plaintext producer/consumer continues to run
+ self.run_produce_consume_validate(self.open_secured_port, client_protocol)
+
+ # Now we can produce and consume via the secured port
+ self.kafka.security_protocol = client_protocol
+ self.create_producer_and_consumer()
+ self.run_produce_consume_validate(lambda: time.sleep(1))
+
+ @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
+ def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
+ """
+ Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
+ Start an Producer and Consumer via the SECURED port
+ Incrementally upgrade to add inter-broker be the secure protocol
+ Incrementally upgrade again to add ACLs as well as disabling the PLAINTEXT port
+ Ensure the producer and consumer ran throughout
+ """
+ #Given we have a broker that has both secure and PLAINTEXT ports open
+ self.kafka.security_protocol = client_protocol
+ self.kafka.interbroker_security_protocol = "PLAINTEXT"
+ self.kafka.start()
+
+ #Create Secured Producer and Consumer
+ self.create_producer_and_consumer()
+
+ #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
+ self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/simple_consumer_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/simple_consumer_shell_test.py b/tests/kafkatest/tests/core/simple_consumer_shell_test.py
new file mode 100644
index 0000000..74a7eeb
--- /dev/null
+++ b/tests/kafkatest/tests/core/simple_consumer_shell_test.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from kafkatest.services.simple_consumer_shell import SimpleConsumerShell
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+TOPIC = "topic-simple-consumer-shell"
+MAX_MESSAGES = 100
+NUM_PARTITIONS = 1
+REPLICATION_FACTOR = 1
+
+class SimpleConsumerShellTest(Test):
+ """
+ Tests SimpleConsumerShell tool
+ """
+ def __init__(self, test_context):
+ super(SimpleConsumerShellTest, self).__init__(test_context)
+ self.num_zk = 1
+ self.num_brokers = 1
+ self.messages_received_count = 0
+ self.topics = {
+ TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
+ }
+
+ self.zk = ZookeeperService(test_context, self.num_zk)
+
+ def setUp(self):
+ self.zk.start()
+
+ def start_kafka(self):
+ self.kafka = KafkaService(
+ self.test_context, self.num_brokers,
+ self.zk, topics=self.topics)
+ self.kafka.start()
+
+ def run_producer(self):
+ # This will produce to kafka cluster
+ self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
+ self.producer.start()
+ wait_until(lambda: self.producer.num_acked == MAX_MESSAGES, timeout_sec=10,
+ err_msg="Timeout awaiting messages to be produced and acked")
+
+ def start_simple_consumer_shell(self):
+ self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC)
+ self.simple_consumer_shell.start()
+
+ def test_simple_consumer_shell(self):
+ """
+ Tests if SimpleConsumerShell is fetching expected records
+ :return: None
+ """
+ self.start_kafka()
+ self.run_producer()
+ self.start_simple_consumer_shell()
+
+ # Assert that SimpleConsumerShell is fetching expected number of messages
+ wait_until(lambda: self.simple_consumer_shell.get_output().count("\n") == (MAX_MESSAGES + 1), timeout_sec=10,
+ err_msg="Timed out waiting to receive expected number of messages.")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
new file mode 100644
index 0000000..9926f11
--- /dev/null
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import config_property
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+
+
+class TestUpgrade(ProduceConsumeValidateTest):
+
+ def __init__(self, test_context):
+ super(TestUpgrade, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+ self.zk.start()
+
+ # Producer and consumer
+ self.producer_throughput = 10000
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
+ self.logger.info("First pass bounce - rolling upgrade")
+ for node in self.kafka.nodes:
+ self.kafka.stop_node(node)
+ node.version = TRUNK
+ node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
+ node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
+ self.kafka.start_node(node)
+
+ self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
+ for node in self.kafka.nodes:
+ self.kafka.stop_node(node)
+ del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
+ if to_message_format_version is None:
+ del node.config[config_property.MESSAGE_FORMAT_VERSION]
+ else:
+ node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
+ self.kafka.start_node(node)
+
+
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"])
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True)
+ @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"])
+ @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"])
+ def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, new_consumer=False):
+ """Test upgrade of Kafka broker cluster from 0.8.2 or 0.9.0 to 0.10
+
+ from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X or 0.9
+
+ If to_message_format_version is None, it means that we will upgrade to default (latest)
+ message format version. It is possible to upgrade to 0.10 brokers but still use message
+ format version 0.9
+
+ - Start 3 node broker cluster on version 'from_kafka_version'
+ - Start producer and consumer in the background
+ - Perform two-phase rolling upgrade
+ - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to
+ from_kafka_version and log.message.format.version set to from_kafka_version
+ - Second phase: remove inter.broker.protocol.version config with rolling bounce; if
+ to_message_format_version is set to 0.9, set log.message.format.version to
+ to_message_format_version, otherwise remove log.message.format.version config
+ - Finally, validate that every message acked by the producer was consumed by the consumer
+ """
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
+ version=KafkaVersion(from_kafka_version),
+ topics={self.topic: {"partitions": 3, "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}})
+ self.kafka.start()
+
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic, throughput=self.producer_throughput,
+ message_validator=is_int,
+ compression_types=compression_types,
+ version=KafkaVersion(from_kafka_version))
+
+ # TODO - reduce the timeout
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+ self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
+ message_validator=is_int, version=KafkaVersion(from_kafka_version))
+
+ self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version,
+ to_message_format_version))
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
new file mode 100644
index 0000000..7f80deb
--- /dev/null
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.security.kafka_acls import ACLs
+from kafkatest.utils import is_int
+import time
+
+class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
+ """Tests a rolling upgrade for zookeeper.
+ """
+
+ def __init__(self, test_context):
+ super(ZooKeeperSecurityUpgradeTest, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.group = "group"
+ self.producer_throughput = 100
+ self.num_producers = 1
+ self.num_consumers = 1
+ self.acls = ACLs()
+
+ self.zk = ZookeeperService(self.test_context, num_nodes=3)
+
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+ "partitions": 3,
+ "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}})
+
+ def create_producer_and_consumer(self):
+ self.producer = VerifiableProducer(
+ self.test_context, self.num_producers, self.kafka, self.topic,
+ throughput=self.producer_throughput)
+
+ self.consumer = ConsoleConsumer(
+ self.test_context, self.num_consumers, self.kafka, self.topic,
+ consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
+
+ self.consumer.group_id = self.group
+
+ @property
+ def no_sasl(self):
+ return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol == "SSL"
+
+ @property
+ def is_secure(self):
+ return self.kafka.security_protocol == "SASL_PLAINTEXT" \
+ or self.kafka.security_protocol == "SSL" \
+ or self.kafka.security_protocol == "SASL_SSL"
+
+ def run_zk_migration(self):
+ # change zk config (auth provider + jaas login)
+ self.zk.kafka_opts = self.zk.security_system_properties
+ self.zk.zk_sasl = True
+ if self.no_sasl:
+ self.kafka.start_minikdc(self.zk.zk_principals)
+ # restart zk
+ for node in self.zk.nodes:
+ self.zk.stop_node(node)
+ self.zk.start_node(node)
+
+ # restart broker with jaas login
+ for node in self.kafka.nodes:
+ self.kafka.stop_node(node)
+ self.kafka.start_node(node)
+
+ # run migration tool
+ for node in self.zk.nodes:
+ self.zk.zookeeper_migration(node, "secure")
+
+ # restart broker with zookeeper.set.acl=true and acls
+ self.kafka.zk_set_acl = "true"
+ for node in self.kafka.nodes:
+ self.kafka.stop_node(node)
+ self.kafka.start_node(node)
+
+ @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"])
+ def test_zk_security_upgrade(self, security_protocol):
+ self.zk.start()
+ self.kafka.security_protocol = security_protocol
+ self.kafka.interbroker_security_protocol = security_protocol
+
+ # set acls
+ if self.is_secure:
+ self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
+ self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
+
+ if(self.no_sasl):
+ self.kafka.start()
+ else:
+ self.kafka.start(self.zk.zk_principals)
+
+ #Create Producer and Consumer
+ self.create_producer_and_consumer()
+
+ #Run upgrade
+ self.run_produce_consume_validate(self.run_zk_migration)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/get_offset_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/get_offset_shell_test.py b/tests/kafkatest/tests/get_offset_shell_test.py
deleted file mode 100644
index 38bd9dc..0000000
--- a/tests/kafkatest/tests/get_offset_shell_test.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from kafkatest.services.verifiable_producer import VerifiableProducer
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-
-TOPIC = "topic-get-offset-shell"
-MAX_MESSAGES = 100
-NUM_PARTITIONS = 1
-REPLICATION_FACTOR = 1
-
-class GetOffsetShellTest(Test):
- """
- Tests GetOffsetShell tool
- """
- def __init__(self, test_context):
- super(GetOffsetShellTest, self).__init__(test_context)
- self.num_zk = 1
- self.num_brokers = 1
- self.messages_received_count = 0
- self.topics = {
- TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
- }
-
- self.zk = ZookeeperService(test_context, self.num_zk)
-
-
-
- def setUp(self):
- self.zk.start()
-
- def start_kafka(self, security_protocol, interbroker_security_protocol):
- self.kafka = KafkaService(
- self.test_context, self.num_brokers,
- self.zk, security_protocol=security_protocol,
- interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
- self.kafka.start()
-
- def start_producer(self):
- # This will produce to kafka cluster
- self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
- self.producer.start()
- current_acked = self.producer.num_acked
- wait_until(lambda: self.producer.num_acked >= current_acked + MAX_MESSAGES, timeout_sec=10,
- err_msg="Timeout awaiting messages to be produced and acked")
-
- def start_consumer(self, security_protocol):
- enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
- self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
- consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
- self.consumer.start()
-
- def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
- """
- Tests if GetOffsetShell is getting offsets correctly
- :return: None
- """
- self.start_kafka(security_protocol, security_protocol)
- self.start_producer()
-
- # Assert that offset fetched without any consumers consuming is 0
- assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0)
-
- self.start_consumer(security_protocol)
-
- node = self.consumer.nodes[0]
-
- wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
-
- # Assert that offset is correctly indicated by GetOffsetShell tool
- wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), timeout_sec=10,
- err_msg="Timed out waiting to reach expected offset.")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/log4j_appender_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/log4j_appender_test.py b/tests/kafkatest/tests/log4j_appender_test.py
deleted file mode 100644
index 42cfeea..0000000
--- a/tests/kafkatest/tests/log4j_appender_test.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender
-from kafkatest.services.security.security_config import SecurityConfig
-
-TOPIC = "topic-log4j-appender"
-MAX_MESSAGES = 100
-
-class Log4jAppenderTest(Test):
- """
- Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints to a Kafka topic
- """
- def __init__(self, test_context):
- super(Log4jAppenderTest, self).__init__(test_context)
- self.num_zk = 1
- self.num_brokers = 1
- self.messages_received_count = 0
- self.topics = {
- TOPIC: {'partitions': 1, 'replication-factor': 1}
- }
-
- self.zk = ZookeeperService(test_context, self.num_zk)
-
- def setUp(self):
- self.zk.start()
-
- def start_kafka(self, security_protocol, interbroker_security_protocol):
- self.kafka = KafkaService(
- self.test_context, self.num_brokers,
- self.zk, security_protocol=security_protocol,
- interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
- self.kafka.start()
-
- def start_appender(self, security_protocol):
- self.appender = KafkaLog4jAppender(self.test_context, self.num_brokers, self.kafka, TOPIC, MAX_MESSAGES,
- security_protocol=security_protocol)
- self.appender.start()
-
- def custom_message_validator(self, msg):
- if msg and "INFO : org.apache.kafka.tools.VerifiableLog4jAppender" in msg:
- self.logger.debug("Received message: %s" % msg)
- self.messages_received_count += 1
-
-
- def start_consumer(self, security_protocol):
- enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
- self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
- consumer_timeout_ms=1000, new_consumer=enable_new_consumer,
- message_validator=self.custom_message_validator)
- self.consumer.start()
-
- @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
- def test_log4j_appender(self, security_protocol='PLAINTEXT'):
- """
- Tests if KafkaLog4jAppender is producing to Kafka topic
- :return: None
- """
- self.start_kafka(security_protocol, security_protocol)
- self.start_appender(security_protocol)
- self.appender.wait()
-
- self.start_consumer(security_protocol)
- node = self.consumer.nodes[0]
-
- wait_until(lambda: self.consumer.alive(node),
- timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
-
- # Verify consumed messages count
- wait_until(lambda: self.messages_received_count == MAX_MESSAGES, timeout_sec=10,
- err_msg="Timed out waiting to consume expected number of messages.")
-
- self.consumer.stop()
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/message_format_change_test.py b/tests/kafkatest/tests/message_format_change_test.py
deleted file mode 100644
index 357fd17..0000000
--- a/tests/kafkatest/tests/message_format_change_test.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.kafka import config_property
-import time
-
-
-class MessageFormatChangeTest(ProduceConsumeValidateTest):
-
- def __init__(self, test_context):
- super(MessageFormatChangeTest, self).__init__(test_context=test_context)
-
- def setUp(self):
- self.topic = "test_topic"
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
-
- self.zk.start()
-
- # Producer and consumer
- self.producer_throughput = 10000
- self.num_producers = 1
- self.num_consumers = 1
- self.messages_per_producer = 100
-
- def produce_and_consume(self, producer_version, consumer_version, group):
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
- self.topic,
- throughput=self.producer_throughput,
- message_validator=is_int,
- version=KafkaVersion(producer_version))
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
- self.topic, consumer_timeout_ms=30000,
- message_validator=is_int, version=KafkaVersion(consumer_version))
- self.consumer.group_id = group
- self.run_produce_consume_validate(lambda: wait_until(
- lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
- timeout_sec=120, backoff_sec=1,
- err_msg="Producer did not produce all messages in reasonable amount of time"))
-
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
- def test_compatibility(self, producer_version, consumer_version):
- """ This tests performs the following checks:
- The workload is a mix of 0.9.x and 0.10.x producers and consumers
- that produce to and consume from a 0.10.x cluster
- 1. initially the topic is using message format 0.9.0
- 2. change the message format version for topic to 0.10.0 on the fly.
- 3. change the message format version for topic back to 0.9.0 on the fly.
- - The producers and consumers should not have any issue.
- - Note that for 0.9.x consumers/producers we only do steps 1 and 2
- """
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
- "partitions": 3,
- "replication-factor": 3,
- 'configs': {"min.insync.replicas": 2}}})
-
- self.kafka.start()
- self.logger.info("First format change to 0.9.0")
- self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
- self.produce_and_consume(producer_version, consumer_version, "group1")
-
- self.logger.info("Second format change to 0.10.0")
- self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
- self.produce_and_consume(producer_version, consumer_version, "group2")
-
- if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
- self.logger.info("Third format change back to 0.9.0")
- self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
- self.produce_and_consume(producer_version, consumer_version, "group3")
-
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/mirror_maker_test.py b/tests/kafkatest/tests/mirror_maker_test.py
deleted file mode 100644
index afb1972..0000000
--- a/tests/kafkatest/tests/mirror_maker_test.py
+++ /dev/null
@@ -1,179 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize, matrix, ignore
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.mirror_maker import MirrorMaker
-from kafkatest.services.security.minikdc import MiniKdc
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-
-import time
-
-
-class TestMirrorMakerService(ProduceConsumeValidateTest):
- """Sanity checks on mirror maker service class."""
- def __init__(self, test_context):
- super(TestMirrorMakerService, self).__init__(test_context)
-
- self.topic = "topic"
- self.source_zk = ZookeeperService(test_context, num_nodes=1)
- self.target_zk = ZookeeperService(test_context, num_nodes=1)
-
- self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
- topics={self.topic: {"partitions": 1, "replication-factor": 1}})
- self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
- topics={self.topic: {"partitions": 1, "replication-factor": 1}})
- # This will produce to source kafka cluster
- self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
- throughput=1000)
- self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
- whitelist=self.topic, offset_commit_interval_ms=1000)
- # This will consume from target kafka cluster
- self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
- message_validator=is_int, consumer_timeout_ms=60000)
-
- def setUp(self):
- # Source cluster
- self.source_zk.start()
-
- # Target cluster
- self.target_zk.start()
-
- def start_kafka(self, security_protocol):
- self.source_kafka.security_protocol = security_protocol
- self.source_kafka.interbroker_security_protocol = security_protocol
- self.target_kafka.security_protocol = security_protocol
- self.target_kafka.interbroker_security_protocol = security_protocol
- if self.source_kafka.security_config.has_sasl_kerberos:
- minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes)
- self.source_kafka.minikdc = minikdc
- self.target_kafka.minikdc = minikdc
- minikdc.start()
- self.source_kafka.start()
- self.target_kafka.start()
-
- def bounce(self, clean_shutdown=True):
- """Bounce mirror maker with a clean (kill -15) or hard (kill -9) shutdown"""
-
- # Wait until messages start appearing in the target cluster
- wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=15)
-
- # Wait for at least one offset to be committed.
- #
- # This step is necessary to prevent data loss with default mirror maker settings:
- # currently, if we don't have at least one committed offset,
- # and we bounce mirror maker, the consumer internals will throw OffsetOutOfRangeException, and the default
- # auto.offset.reset policy ("largest") will kick in, causing mirrormaker to start consuming from the largest
- # offset. As a result, any messages produced to the source cluster while mirrormaker was dead won't get
- # mirrored to the target cluster.
- # (see https://issues.apache.org/jira/browse/KAFKA-2759)
- #
- # This isn't necessary with kill -15 because mirror maker commits its offsets during graceful
- # shutdown.
- if not clean_shutdown:
- time.sleep(self.mirror_maker.offset_commit_interval_ms / 1000.0 + .5)
-
- for i in range(3):
- self.logger.info("Bringing mirror maker nodes down...")
- for node in self.mirror_maker.nodes:
- self.mirror_maker.stop_node(node, clean_shutdown=clean_shutdown)
-
- num_consumed = len(self.consumer.messages_consumed[1])
- self.logger.info("Bringing mirror maker nodes back up...")
- for node in self.mirror_maker.nodes:
- self.mirror_maker.start_node(node)
-
- # Ensure new messages are once again showing up on the target cluster
- # new consumer requires higher timeout here
- wait_until(lambda: len(self.consumer.messages_consumed[1]) > num_consumed + 100, timeout_sec=60)
-
- def wait_for_n_messages(self, n_messages=100):
- """Wait for a minimum number of messages to be successfully produced."""
- wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
- err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
-
- @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
- @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
- def test_simple_end_to_end(self, security_protocol, new_consumer):
- """
- Test end-to-end behavior under non-failure conditions.
-
- Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
- One is source, and the other is target. Single-node mirror maker mirrors from source to target.
-
- - Start mirror maker.
- - Produce a small number of messages to the source cluster.
- - Consume messages from target.
- - Verify that number of consumed messages matches the number produced.
- """
- self.start_kafka(security_protocol)
- self.consumer.new_consumer = new_consumer
-
- self.mirror_maker.new_consumer = new_consumer
- self.mirror_maker.start()
-
- mm_node = self.mirror_maker.nodes[0]
- with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
- if new_consumer:
- monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
- else:
- monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-
- self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages)
- self.mirror_maker.stop()
-
- @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
- @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
- def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
- """
- Test end-to-end behavior under failure conditions.
-
- Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
- One is source, and the other is target. Single-node mirror maker mirrors from source to target.
-
- - Start mirror maker.
- - Produce to source cluster, and consume from target cluster in the background.
- - Bounce MM process
- - Verify every message acknowledged by the source producer is consumed by the target consumer
- """
- if new_consumer and not clean_shutdown:
- # Increase timeout on downstream console consumer; mirror maker with new consumer takes extra time
- # during hard bounce. This is because the restarted mirror maker consumer won't be able to rejoin
- # the group until the previous session times out
- self.consumer.consumer_timeout_ms = 60000
-
- self.start_kafka(security_protocol)
- self.consumer.new_consumer = new_consumer
-
- self.mirror_maker.offsets_storage = offsets_storage
- self.mirror_maker.new_consumer = new_consumer
- self.mirror_maker.start()
-
- # Wait until mirror maker has reset fetch offset at least once before continuing with the rest of the test
- mm_node = self.mirror_maker.nodes[0]
- with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
- if new_consumer:
- monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
- else:
- monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-
- self.run_produce_consume_validate(core_test_action=lambda: self.bounce(clean_shutdown=clean_shutdown))
- self.mirror_maker.stop()