You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/01/10 20:00:06 UTC
[kafka] branch 3.1 updated: KAFKA-13582: TestVerifiableProducer.test_multiple_kraft_security_protocols fails (#11664)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new ac6ae23 KAFKA-13582: TestVerifiableProducer.test_multiple_kraft_security_protocols fails (#11664)
ac6ae23 is described below
commit ac6ae231a4750be40cb2eb72d8d78d5081470391
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Mon Jan 10 14:54:26 2022 -0500
KAFKA-13582: TestVerifiableProducer.test_multiple_kraft_security_protocols fails (#11664)
KRaft brokers always use the first controller listener, so if there is not also a colocated KRaft controller on the node be sure to only publish one controller listener in `controller.listener.names` even when the inter-controller listener name differs. System tests were failing due to unnecessarily publishing a second entry in `controller.listener.names` for a broker-only config and not also publishing a mapping for it in `listener.security.protocol.map`. Removing the unnecessary e [...]
Reviewers: David Jacot <dj...@confluent.io>
---
tests/kafkatest/services/kafka/kafka.py | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index f275853..55b5b7b 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -677,7 +677,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
advertised_listeners = []
protocol_map = []
- controller_listener_names = self.controller_listener_name_list()
+ controller_listener_names = self.controller_listener_name_list(node)
for port in self.port_mappings.values():
if port.open:
@@ -758,12 +758,17 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
KafkaService.STDOUT_STDERR_CAPTURE)
return cmd
- def controller_listener_name_list(self):
+ def controller_listener_name_list(self, node):
if self.quorum_info.using_zk:
return []
broker_to_controller_listener_name = self.controller_listener_name(self.controller_quorum.controller_security_protocol)
- return [broker_to_controller_listener_name] if (self.controller_quorum.intercontroller_security_protocol == self.controller_quorum.controller_security_protocol) \
- else [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)]
+ # Brokers always use the first controller listener, so include a second, inter-controller listener if and only if:
+ # 1) the node is a controller node
+ # 2) the inter-controller listener name differs from the broker-to-controller listener name
+ return [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)] \
+ if (quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role and
+ self.controller_quorum.intercontroller_security_protocol != self.controller_quorum.controller_security_protocol) \
+ else [broker_to_controller_listener_name]
def start_node(self, node, timeout_sec=60):
if node not in self.nodes_to_start:
@@ -772,7 +777,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
if self.quorum_info.has_controllers:
- for controller_listener in self.controller_listener_name_list():
+ for controller_listener in self.controller_listener_name_list(node):
if self.node_quorum_info.has_controller_role:
self.open_port(controller_listener)
else: # co-located case where node doesn't have a controller
@@ -793,7 +798,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names
- self.controller_listener_names = ','.join(self.controller_listener_name_list())
+ self.controller_listener_names = ','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match remote quorum if one exists
if self.remote_controller_quorum:
self.controller_sasl_mechanism = self.remote_controller_quorum.controller_sasl_mechanism