You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/10/20 06:21:44 UTC
[kafka] branch trunk updated: MINOR: Add a replication system test
which simulates a slow replica (#11395)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 38a3ddb MINOR: Add a replication system test which simulates a slow replica (#11395)
38a3ddb is described below
commit 38a3ddb5627f7860aea40cb602186bde79808d3d
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Oct 20 08:19:36 2021 +0200
MINOR: Add a replication system test which simulates a slow replica (#11395)
This patch adds a new system test which exercises the shrining/expansion process of the partition leader. It does so by introducing a network partition which isolates a broker from the other brokers in the cluster but not from KRaft Controller/ZK.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
tests/kafkatest/services/kafka/kafka.py | 15 +--
.../tests/core/replication_replica_failure_test.py | 123 +++++++++++++++++++++
tests/kafkatest/tests/end_to_end.py | 4 +
3 files changed, 135 insertions(+), 7 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e057015..f275853 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -902,7 +902,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
clean_shutdown=False, allow_fail=True)
node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False)
- def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None):
+ def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-topics against a broker, not a KRaft controller")
if force_use_zk_connection:
@@ -918,7 +918,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
security_protocol_to_use = self.security_protocol
else:
security_protocol_to_use = kafka_security_protocol
- bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
+ bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use, offline_nodes=offline_nodes))
skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
if skip_optional_security_settings:
optional_jass_krb_system_props_prefix = ""
@@ -1168,7 +1168,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
return under_replicated_partitions
- def describe_topic(self, topic, node=None):
+ def describe_topic(self, topic, node=None, offline_nodes=[]):
if node is None:
node = self.nodes[0]
@@ -1176,7 +1176,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
cmd = fix_opts_for_new_jvm(node)
cmd += "%s --topic %s --describe" % \
- (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic)
+ (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection, offline_nodes=offline_nodes), topic)
self.logger.info("Running topic describe command...\n%s" % cmd)
output = ""
@@ -1447,10 +1447,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
found_lines = [line for line in describe_topic_output.splitlines() if grep_for in line]
return None if not found_lines else found_lines[0]
- def isr_idx_list(self, topic, partition=0):
+ def isr_idx_list(self, topic, partition=0, node=None, offline_nodes=[]):
""" Get in-sync replica list the given topic and partition.
"""
- node = self.nodes[0]
+ if node is None:
+ node = self.nodes[0]
if not self.all_nodes_topic_command_supports_bootstrap_server():
self.logger.debug("Querying zookeeper to find in-sync replicas for topic %s and partition %d" % (topic, partition))
zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
@@ -1465,7 +1466,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
isr_idx_list = partition_state["isr"]
else:
self.logger.debug("Querying Kafka Admin API to find in-sync replicas for topic %s and partition %d" % (topic, partition))
- describe_output = self.describe_topic(topic, node)
+ describe_output = self.describe_topic(topic, node, offline_nodes=offline_nodes)
self.logger.debug(describe_output)
requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output)
# e.g. Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py b/tests/kafkatest/tests/core/replication_replica_failure_test.py
new file mode 100644
index 0000000..42e1c09
--- /dev/null
+++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.mark import matrix
+from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.kafka import quorum
+from kafkatest.tests.end_to_end import EndToEndTest
+from kafkatest.services.kafka import config_property
+from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+
+import signal
+import time
+
+class ReplicationReplicaFailureTest(EndToEndTest):
+
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(ReplicationReplicaFailureTest, self).__init__(test_context=test_context, topic=None)
+
+ @cluster(num_nodes=7)
+ @matrix(metadata_quorum=quorum.all_non_upgrade)
+ def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk):
+ """
+ This test verifies that replication shrinks the ISR when a replica is not fetching anymore.
+ It also verifies that replication provides simple durability guarantees by checking that data acked by
+ brokers is still available for consumption.
+
+ Setup: 1 zk/KRaft controller, 3 kafka nodes, 1 topic with partitions=1, replication-factor=3, and min.insync.replicas=2
+ - Produce messages in the background
+ - Consume messages in the background
+ - Partition a follower
+ - Validate that the ISR was shrunk
+ - Stop producing and finish consuming
+ - Validate that every acked message was consumed
+ """
+ self.create_zookeeper_if_necessary()
+ if self.zk:
+ self.zk.start()
+
+ self.create_kafka(num_nodes=3,
+ server_prop_overrides=[["replica.lag.time.max.ms", "10000"]],
+ controller_num_nodes_override=1)
+ self.kafka.start()
+
+ self.trogdor = TrogdorService(context=self.test_context,
+ client_services=[self.kafka])
+ self.trogdor.start()
+
+ # If ZK is used, the partition leader is put on the controller node
+ # to avoid partitioning the controller later on in the test.
+ if self.zk:
+ controller = self.kafka.controller()
+ assignment = [self.kafka.idx(controller)] + [self.kafka.idx(node) for node in self.kafka.nodes if node != controller]
+ else:
+ assignment = [self.kafka.idx(node) for node in self.kafka.nodes]
+
+ self.topic = "test_topic"
+ self.kafka.create_topic({"topic": self.topic,
+ "replica-assignment": ":".join(map(str, assignment)),
+ "configs": {"min.insync.replicas": 2}})
+
+ self.logger.info("Created topic %s with assignment %s", self.topic, ", ".join(map(str, assignment)))
+
+ self.create_producer()
+ self.producer.start()
+
+ self.create_consumer()
+ self.consumer.start()
+
+ self.await_startup()
+
+ leader = self.kafka.leader(self.topic, partition=0)
+ replicas = self.kafka.replicas(self.topic, partition=0)
+
+ # One of the followers is picked to be partitioned.
+ follower_to_partition = [replica for replica in replicas if replica != leader][0]
+ self.logger.info("Partitioning follower %s (%s) from the other brokers", self.kafka.idx(follower_to_partition), follower_to_partition.name)
+ partition_spec = NetworkPartitionFaultSpec(0, 5*60*1000,
+ [[follower_to_partition], [node for node in self.kafka.nodes if node != follower_to_partition]])
+ partition = self.trogdor.create_task("partition", partition_spec)
+
+ def current_isr():
+ try:
+ # Due to the network partition, the kafka-topics command could fail if it tries
+ # to connect to the partitioned broker. Therefore we catch the error here and retry.
+ return set(self.kafka.isr_idx_list(self.topic, partition=0, node=leader, offline_nodes=[follower_to_partition]))
+ except RemoteCommandError as e:
+ return set()
+
+ # Verify that ISR is shrunk.
+ expected_isr = {self.kafka.idx(replica) for replica in replicas if replica != follower_to_partition}
+ wait_until(lambda: current_isr() == expected_isr,
+ timeout_sec=120, backoff_sec=1, err_msg="ISR should have been shrunk.")
+
+ # Wait until the network partition is removed.
+ partition.stop()
+ partition.wait_for_done(timeout_sec=300)
+
+ # Verify that ISR is expanded.
+ expected_isr = {self.kafka.idx(replica) for replica in replicas}
+ wait_until(lambda: current_isr() == expected_isr,
+ timeout_sec=120, backoff_sec=1, err_msg="ISR should have been expanded.")
+
+ self.run_validation(producer_timeout_sec=120, min_records=25000)
diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py
index bfc316e..5330565 100644
--- a/tests/kafkatest/tests/end_to_end.py
+++ b/tests/kafkatest/tests/end_to_end.py
@@ -55,6 +55,10 @@ class EndToEndTest(Test):
self.topic: self.topic_config,
"__consumer_offsets": group_metadata_config
}
+
+ if self.topic:
+ topics[self.topic] = self.topic_config
+
self.kafka = KafkaService(self.test_context, num_nodes=num_nodes,
zk=self.zk, topics=topics, **kwargs)