You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/10/27 17:33:49 UTC
(kafka) branch trunk updated: KAFKA-15578: System Tests for running old protocol with new coordinator (#14524)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 68a5072f54a KAFKA-15578: System Tests for running old protocol with new coordinator (#14524)
68a5072f54a is described below
commit 68a5072f54aa73460743b36c06f52b6263549a17
Author: Ritika Reddy <98...@users.noreply.github.com>
AuthorDate: Fri Oct 27 10:33:40 2023 -0700
KAFKA-15578: System Tests for running old protocol with new coordinator (#14524)
This patch adds configs to facilitate the testing with the new group coordinator (KIP-848) in kraft mode. Only one test files is converted at the moment. The others will follow.
Reviewers: Ian McDonald <im...@confluent.io>, David Jacot <dj...@confluent.io>
---
tests/kafkatest/services/kafka/config.py | 3 +-
tests/kafkatest/services/kafka/config_property.py | 4 ++
tests/kafkatest/services/kafka/kafka.py | 21 +++++-
.../services/kafka/templates/log4j.properties | 3 +
tests/kafkatest/tests/core/consume_bench_test.py | 75 ++++++++++++++++++----
5 files changed, 91 insertions(+), 15 deletions(-)
diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py
index da5b4a2f0ac..d2bdddc9449 100644
--- a/tests/kafkatest/services/kafka/config.py
+++ b/tests/kafkatest/services/kafka/config.py
@@ -28,7 +28,8 @@ class KafkaConfig(dict):
config_property.METADATA_LOG_SEGMENT_BYTES: str(9*1024*1024), # 9 MB
config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: str(10*1024*1024), # 10 MB
config_property.METADATA_LOG_RETENTION_BYTES: str(10*1024*1024), # 10 MB
- config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000) # one minute
+ config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000), # one minute
+ config_property.NEW_GROUP_COORDINATOR_ENABLE: False
}
def __init__(self, **kwargs):
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index de114224404..335cb02bcc0 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -69,6 +69,7 @@ DELEGATION_TOKEN_EXPIRY_TIME_MS="delegation.token.expiry.time.ms"
DELEGATION_TOKEN_SECRET_KEY="delegation.token.secret.key"
SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
+NEW_GROUP_COORDINATOR_ENABLE="group.coordinator.new.enable"
"""
From KafkaConfig.scala
@@ -199,6 +200,9 @@ From KafkaConfig.scala
val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
val SSLSecureRandomImplementationProp = SSLConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG
val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
+
+ /** New group coordinator configs */
+ val NewGroupCoordinatorEnableProp = "group.coordinator.new.enable"
"""
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index ab48d1091d7..952b4634244 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -203,7 +203,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
isolated_kafka=None,
controller_num_nodes_override=0,
allow_zk_with_kraft=False,
- quorum_info_provider=None
+ quorum_info_provider=None,
+ use_new_coordinator=None
):
"""
:param context: test context
@@ -264,6 +265,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
:param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper
:param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context
+ :param use_new_coordinator: When true, use the new implementation of the group coordinator as per KIP-848. If this is None, the default existing group coordinator is used.
"""
self.zk = zk
@@ -276,6 +278,19 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.controller_quorum = None # will define below if necessary
self.isolated_controller_quorum = None # will define below if necessary
self.configured_for_zk_migration = False
+
+ # Set use_new_coordinator based on context and arguments.
+ default_use_new_coordinator = False
+
+ if use_new_coordinator is None:
+ arg_name = 'use_new_coordinator'
+ if context.injected_args is not None:
+ use_new_coordinator = context.injected_args.get(arg_name)
+ if use_new_coordinator is None:
+ use_new_coordinator = context.globals.get(arg_name, default_use_new_coordinator)
+
+ # Assign the determined value.
+ self.use_new_coordinator = use_new_coordinator
if num_nodes < 1:
raise Exception("Must set a positive number of nodes: %i" % num_nodes)
@@ -407,6 +422,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
kraft_broker_configs = {
config_property.PORT: config_property.FIRST_BROKER_PORT,
config_property.NODE_ID: self.idx(node),
+ config_property.NEW_GROUP_COORDINATOR_ENABLE: use_new_coordinator
}
kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
kraft_broker_plus_zk_configs.update(zk_broker_configs)
@@ -764,6 +780,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
else:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
+ if self.use_new_coordinator:
+ override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = 'true'
+
for prop in self.server_prop_overrides:
override_configs[prop[0]] = prop[1]
diff --git a/tests/kafkatest/services/kafka/templates/log4j.properties b/tests/kafkatest/services/kafka/templates/log4j.properties
index 5897658da49..04c9bd5d6ab 100644
--- a/tests/kafkatest/services/kafka/templates/log4j.properties
+++ b/tests/kafkatest/services/kafka/templates/log4j.properties
@@ -136,3 +136,6 @@ log4j.additivity.state.change.logger=false
log4j.logger.kafka.authorizer.logger={{ log_level|default("DEBUG") }}, authorizerInfoAppender, authorizerDebugAppender
log4j.additivity.kafka.authorizer.logger=false
+#New Group Coordinator logging.
+log4j.logger.org.apache.kafka.coordinator.group={{ log_level|default("DEBUG") }}, kafkaInfoAppender, kafkaDebugAppender
+log4j.additivity.org.apache.kafka.coordinator.group=false
diff --git a/tests/kafkatest/tests/core/consume_bench_test.py b/tests/kafkatest/tests/core/consume_bench_test.py
index ce08d80832a..07633c55330 100644
--- a/tests/kafkatest/tests/core/consume_bench_test.py
+++ b/tests/kafkatest/tests/core/consume_bench_test.py
@@ -68,9 +68,23 @@ class ConsumeBenchTest(Test):
self.logger.debug("Produce workload finished")
@cluster(num_nodes=10)
- @matrix(topics=[["consume_bench_topic[0-5]"]], metadata_quorum=quorum.all_non_upgrade) # topic subscription
- @matrix(topics=[["consume_bench_topic[0-5]:[0-4]"]], metadata_quorum=quorum.all_non_upgrade) # manual topic assignment
- def test_consume_bench(self, topics, metadata_quorum=quorum.zk):
+ @matrix(
+ topics=[
+ ["consume_bench_topic[0-5]"], # topic subscription
+ ["consume_bench_topic[0-5]:[0-4]"] # manual topic assignment
+ ],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ topics=[
+ ["consume_bench_topic[0-5]"], # topic subscription
+ ["consume_bench_topic[0-5]:[0-4]"] # manual topic assignment
+ ],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_consume_bench(self, topics, metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Runs a ConsumeBench workload to consume messages
"""
@@ -91,8 +105,15 @@ class ConsumeBenchTest(Test):
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
@cluster(num_nodes=10)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_single_partition(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_single_partition(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Run a ConsumeBench against a single partition
"""
@@ -114,8 +135,15 @@ class ConsumeBenchTest(Test):
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
@cluster(num_nodes=10)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Runs multiple consumers group to read messages from topics.
Since a consumerGroup isn't specified, each consumer should read from all topics independently
@@ -138,8 +166,15 @@ class ConsumeBenchTest(Test):
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
@cluster(num_nodes=10)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Runs two consumers in the same consumer group to read messages from topics.
Since a consumerGroup is specified, each consumer should dynamically get assigned a partition from group
@@ -163,8 +198,15 @@ class ConsumeBenchTest(Test):
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
@cluster(num_nodes=10)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Runs multiple consumers in to read messages from specific partitions.
Since a consumerGroup isn't specified, each consumer will get assigned a random group
@@ -188,8 +230,15 @@ class ConsumeBenchTest(Test):
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
@cluster(num_nodes=10)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Runs multiple consumers in the same group to read messages from specific partitions.
It is an invalid configuration to provide a consumer group and specific partitions.