You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/01/10 20:00:06 UTC

[kafka] branch 3.1 updated: KAFKA-13582: TestVerifiableProducer.test_multiple_kraft_security_protocols fails (#11664)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new ac6ae23  KAFKA-13582: TestVerifiableProducer.test_multiple_kraft_security_protocols fails (#11664)
ac6ae23 is described below

commit ac6ae231a4750be40cb2eb72d8d78d5081470391
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Mon Jan 10 14:54:26 2022 -0500

    KAFKA-13582: TestVerifiableProducer.test_multiple_kraft_security_protocols fails (#11664)
    
    KRaft brokers always use the first controller listener, so if there is not also a colocated KRaft controller on the node be sure to only publish one controller listener in `controller.listener.names` even when the inter-controller listener name differs.  System tests were failing due to unnecessarily publishing a second entry in `controller.listener.names` for a broker-only config and not also publishing a mapping for it in `listener.security.protocol.map`.  Removing the unnecessary e [...]
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 tests/kafkatest/services/kafka/kafka.py | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index f275853..55b5b7b 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -677,7 +677,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         advertised_listeners = []
         protocol_map = []
 
-        controller_listener_names = self.controller_listener_name_list()
+        controller_listener_names = self.controller_listener_name_list(node)
 
         for port in self.port_mappings.values():
             if port.open:
@@ -758,12 +758,17 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                 KafkaService.STDOUT_STDERR_CAPTURE)
         return cmd
 
-    def controller_listener_name_list(self):
+    def controller_listener_name_list(self, node):
         if self.quorum_info.using_zk:
             return []
         broker_to_controller_listener_name = self.controller_listener_name(self.controller_quorum.controller_security_protocol)
-        return [broker_to_controller_listener_name] if (self.controller_quorum.intercontroller_security_protocol == self.controller_quorum.controller_security_protocol) \
-            else [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)]
+        # Brokers always use the first controller listener, so include a second, inter-controller listener if and only if:
+        # 1) the node is a controller node
+        # 2) the inter-controller listener name differs from the broker-to-controller listener name
+        return [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)] \
+            if (quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role and
+                self.controller_quorum.intercontroller_security_protocol != self.controller_quorum.controller_security_protocol) \
+            else [broker_to_controller_listener_name]
 
     def start_node(self, node, timeout_sec=60):
         if node not in self.nodes_to_start:
@@ -772,7 +777,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         self.node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
         if self.quorum_info.has_controllers:
-            for controller_listener in self.controller_listener_name_list():
+            for controller_listener in self.controller_listener_name_list(node):
                 if self.node_quorum_info.has_controller_role:
                     self.open_port(controller_listener)
                 else: # co-located case where node doesn't have a controller
@@ -793,7 +798,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                                                        KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
                                                       for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
             # define controller.listener.names
-            self.controller_listener_names = ','.join(self.controller_listener_name_list())
+            self.controller_listener_names = ','.join(self.controller_listener_name_list(node))
             # define sasl.mechanism.controller.protocol to match remote quorum if one exists
             if self.remote_controller_quorum:
                 self.controller_sasl_mechanism = self.remote_controller_quorum.controller_sasl_mechanism