You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/10/20 06:21:44 UTC

[kafka] branch trunk updated: MINOR: Add a replication system test which simulates a slow replica (#11395)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 38a3ddb  MINOR: Add a replication system test which simulates a slow replica (#11395)
38a3ddb is described below

commit 38a3ddb5627f7860aea40cb602186bde79808d3d
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Oct 20 08:19:36 2021 +0200

    MINOR: Add a replication system test which simulates a slow replica (#11395)
    
    This patch adds a new system test which exercises the shrining/expansion process of the partition leader. It does so by introducing a network partition which isolates a broker from the other brokers in the cluster but not from KRaft Controller/ZK.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 tests/kafkatest/services/kafka/kafka.py            |  15 +--
 .../tests/core/replication_replica_failure_test.py | 123 +++++++++++++++++++++
 tests/kafkatest/tests/end_to_end.py                |   4 +
 3 files changed, 135 insertions(+), 7 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e057015..f275853 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -902,7 +902,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                                          clean_shutdown=False, allow_fail=True)
         node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False)
 
-    def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None):
+    def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]):
         if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
             raise Exception("Must invoke kafka-topics against a broker, not a KRaft controller")
         if force_use_zk_connection:
@@ -918,7 +918,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                     security_protocol_to_use = self.security_protocol
             else:
                 security_protocol_to_use = kafka_security_protocol
-            bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
+            bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use, offline_nodes=offline_nodes))
             skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
         if skip_optional_security_settings:
             optional_jass_krb_system_props_prefix = ""
@@ -1168,7 +1168,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         return under_replicated_partitions
 
-    def describe_topic(self, topic, node=None):
+    def describe_topic(self, topic, node=None, offline_nodes=[]):
         if node is None:
             node = self.nodes[0]
 
@@ -1176,7 +1176,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         cmd = fix_opts_for_new_jvm(node)
         cmd += "%s --topic %s --describe" % \
-               (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic)
+               (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection, offline_nodes=offline_nodes), topic)
 
         self.logger.info("Running topic describe command...\n%s" % cmd)
         output = ""
@@ -1447,10 +1447,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         found_lines = [line for line in describe_topic_output.splitlines() if grep_for in line]
         return None if not found_lines else found_lines[0]
 
-    def isr_idx_list(self, topic, partition=0):
+    def isr_idx_list(self, topic, partition=0, node=None, offline_nodes=[]):
         """ Get in-sync replica list the given topic and partition.
         """
-        node = self.nodes[0]
+        if node is None:
+          node = self.nodes[0]
         if not self.all_nodes_topic_command_supports_bootstrap_server():
             self.logger.debug("Querying zookeeper to find in-sync replicas for topic %s and partition %d" % (topic, partition))
             zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
@@ -1465,7 +1466,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             isr_idx_list = partition_state["isr"]
         else:
             self.logger.debug("Querying Kafka Admin API to find in-sync replicas for topic %s and partition %d" % (topic, partition))
-            describe_output = self.describe_topic(topic, node)
+            describe_output = self.describe_topic(topic, node, offline_nodes=offline_nodes)
             self.logger.debug(describe_output)
             requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output)
             # e.g. Topic: test_topic	Partition: 0	Leader: 3	Replicas: 3,2	Isr: 3,2
diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py b/tests/kafkatest/tests/core/replication_replica_failure_test.py
new file mode 100644
index 0000000..42e1c09
--- /dev/null
+++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.mark import matrix
+from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.kafka import quorum
+from kafkatest.tests.end_to_end import EndToEndTest
+from kafkatest.services.kafka import config_property
+from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+
+import signal
+import time
+
+class ReplicationReplicaFailureTest(EndToEndTest):
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ReplicationReplicaFailureTest, self).__init__(test_context=test_context, topic=None)
+
+    @cluster(num_nodes=7)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk):
+        """
+        This test verifies that replication shrinks the ISR when a replica is not fetching anymore.
+        It also verifies that replication provides simple durability guarantees by checking that data acked by
+        brokers is still available for consumption.
+
+        Setup: 1 zk/KRaft controller, 3 kafka nodes, 1 topic with partitions=1, replication-factor=3, and min.insync.replicas=2
+          - Produce messages in the background
+          - Consume messages in the background
+          - Partition a follower
+          - Validate that the ISR was shrunk
+          - Stop producing and finish consuming
+          - Validate that every acked message was consumed
+        """
+        self.create_zookeeper_if_necessary()
+        if self.zk:
+            self.zk.start()
+
+        self.create_kafka(num_nodes=3,
+                          server_prop_overrides=[["replica.lag.time.max.ms", "10000"]],
+                          controller_num_nodes_override=1)
+        self.kafka.start()
+
+        self.trogdor = TrogdorService(context=self.test_context,
+                                      client_services=[self.kafka])
+        self.trogdor.start()
+
+        # If ZK is used, the partition leader is put on the controller node
+        # to avoid partitioning the controller later on in the test.
+        if self.zk:
+            controller = self.kafka.controller()
+            assignment = [self.kafka.idx(controller)] + [self.kafka.idx(node) for node in self.kafka.nodes if node != controller]
+        else:
+            assignment = [self.kafka.idx(node) for node in self.kafka.nodes]
+
+        self.topic = "test_topic"
+        self.kafka.create_topic({"topic": self.topic,
+                                 "replica-assignment": ":".join(map(str, assignment)),
+                                 "configs": {"min.insync.replicas": 2}})
+
+        self.logger.info("Created topic %s with assignment %s", self.topic, ", ".join(map(str, assignment)))
+
+        self.create_producer()
+        self.producer.start()
+
+        self.create_consumer()
+        self.consumer.start()
+
+        self.await_startup()
+
+        leader = self.kafka.leader(self.topic, partition=0)
+        replicas = self.kafka.replicas(self.topic, partition=0)
+
+        # One of the followers is picked to be partitioned.
+        follower_to_partition = [replica for replica in replicas if replica != leader][0]
+        self.logger.info("Partitioning follower %s (%s) from the other brokers", self.kafka.idx(follower_to_partition), follower_to_partition.name)
+        partition_spec = NetworkPartitionFaultSpec(0, 5*60*1000,
+            [[follower_to_partition], [node for node in self.kafka.nodes if node != follower_to_partition]])
+        partition = self.trogdor.create_task("partition", partition_spec)
+
+        def current_isr():
+            try:
+                # Due to the network partition, the kafka-topics command could fail if it tries
+                # to connect to the partitioned broker. Therefore we catch the error here and retry.
+                return set(self.kafka.isr_idx_list(self.topic, partition=0, node=leader, offline_nodes=[follower_to_partition]))
+            except RemoteCommandError as e:
+                return set()
+
+        # Verify that ISR is shrunk.
+        expected_isr = {self.kafka.idx(replica) for replica in replicas if replica != follower_to_partition}
+        wait_until(lambda: current_isr() == expected_isr,
+                   timeout_sec=120, backoff_sec=1, err_msg="ISR should have been shrunk.")
+
+        # Wait until the network partition is removed.
+        partition.stop()
+        partition.wait_for_done(timeout_sec=300)
+
+        # Verify that ISR is expanded.
+        expected_isr = {self.kafka.idx(replica) for replica in replicas}
+        wait_until(lambda: current_isr() == expected_isr,
+                   timeout_sec=120, backoff_sec=1, err_msg="ISR should have been expanded.")
+
+        self.run_validation(producer_timeout_sec=120, min_records=25000)
diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py
index bfc316e..5330565 100644
--- a/tests/kafkatest/tests/end_to_end.py
+++ b/tests/kafkatest/tests/end_to_end.py
@@ -55,6 +55,10 @@ class EndToEndTest(Test):
             self.topic: self.topic_config,
             "__consumer_offsets": group_metadata_config
         }
+
+        if self.topic:
+            topics[self.topic] = self.topic_config
+
         self.kafka = KafkaService(self.test_context, num_nodes=num_nodes,
                                   zk=self.zk, topics=topics, **kwargs)