You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/10/27 17:33:49 UTC

(kafka) branch trunk updated: KAFKA-15578: System Tests for running old protocol with new coordinator (#14524)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 68a5072f54a KAFKA-15578: System Tests for running old protocol with new coordinator (#14524)
68a5072f54a is described below

commit 68a5072f54aa73460743b36c06f52b6263549a17
Author: Ritika Reddy <98...@users.noreply.github.com>
AuthorDate: Fri Oct 27 10:33:40 2023 -0700

    KAFKA-15578: System Tests for running old protocol with new coordinator (#14524)
    
    This patch adds configs to facilitate the testing with the new group coordinator (KIP-848) in kraft mode. Only one test files is converted at the moment. The others will follow.
    
    Reviewers: Ian McDonald <im...@confluent.io>, David Jacot <dj...@confluent.io>
---
 tests/kafkatest/services/kafka/config.py           |  3 +-
 tests/kafkatest/services/kafka/config_property.py  |  4 ++
 tests/kafkatest/services/kafka/kafka.py            | 21 +++++-
 .../services/kafka/templates/log4j.properties      |  3 +
 tests/kafkatest/tests/core/consume_bench_test.py   | 75 ++++++++++++++++++----
 5 files changed, 91 insertions(+), 15 deletions(-)

diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py
index da5b4a2f0ac..d2bdddc9449 100644
--- a/tests/kafkatest/services/kafka/config.py
+++ b/tests/kafkatest/services/kafka/config.py
@@ -28,7 +28,8 @@ class KafkaConfig(dict):
         config_property.METADATA_LOG_SEGMENT_BYTES: str(9*1024*1024), # 9 MB
         config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: str(10*1024*1024), # 10 MB
         config_property.METADATA_LOG_RETENTION_BYTES: str(10*1024*1024), # 10 MB
-        config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000) # one minute
+        config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000), # one minute
+        config_property.NEW_GROUP_COORDINATOR_ENABLE: False
     }
 
     def __init__(self, **kwargs):
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index de114224404..335cb02bcc0 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -69,6 +69,7 @@ DELEGATION_TOKEN_EXPIRY_TIME_MS="delegation.token.expiry.time.ms"
 DELEGATION_TOKEN_SECRET_KEY="delegation.token.secret.key"
 SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
 
+NEW_GROUP_COORDINATOR_ENABLE="group.coordinator.new.enable"
 
 """
 From KafkaConfig.scala
@@ -199,6 +200,9 @@ From KafkaConfig.scala
   val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
   val SSLSecureRandomImplementationProp = SSLConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG
   val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
+
+  /** New group coordinator configs */
+  val NewGroupCoordinatorEnableProp = "group.coordinator.new.enable"
 """
 
 
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index ab48d1091d7..952b4634244 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -203,7 +203,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                  isolated_kafka=None,
                  controller_num_nodes_override=0,
                  allow_zk_with_kraft=False,
-                 quorum_info_provider=None
+                 quorum_info_provider=None,
+                 use_new_coordinator=None
                  ):
         """
         :param context: test context
@@ -264,6 +265,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         :param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise
         :param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper
         :param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context
+        :param use_new_coordinator: When true, use the new implementation of the group coordinator as per KIP-848. If this is None, the default existing group coordinator is used.
         """
 
         self.zk = zk
@@ -276,6 +278,19 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.controller_quorum = None # will define below if necessary
         self.isolated_controller_quorum = None # will define below if necessary
         self.configured_for_zk_migration = False
+        
+        # Set use_new_coordinator based on context and arguments.
+        default_use_new_coordinator = False
+       
+        if use_new_coordinator is None:
+            arg_name = 'use_new_coordinator'
+            if context.injected_args is not None:
+                use_new_coordinator = context.injected_args.get(arg_name)
+            if use_new_coordinator is None:
+                use_new_coordinator = context.globals.get(arg_name, default_use_new_coordinator)
+        
+        # Assign the determined value.
+        self.use_new_coordinator = use_new_coordinator
 
         if num_nodes < 1:
             raise Exception("Must set a positive number of nodes: %i" % num_nodes)
@@ -407,6 +422,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             kraft_broker_configs = {
                 config_property.PORT: config_property.FIRST_BROKER_PORT,
                 config_property.NODE_ID: self.idx(node),
+                config_property.NEW_GROUP_COORDINATOR_ENABLE: use_new_coordinator
             }
             kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
             kraft_broker_plus_zk_configs.update(zk_broker_configs)
@@ -764,6 +780,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             else:
                 override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
 
+        if self.use_new_coordinator:
+            override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = 'true'
+    
         for prop in self.server_prop_overrides:
             override_configs[prop[0]] = prop[1]
 
diff --git a/tests/kafkatest/services/kafka/templates/log4j.properties b/tests/kafkatest/services/kafka/templates/log4j.properties
index 5897658da49..04c9bd5d6ab 100644
--- a/tests/kafkatest/services/kafka/templates/log4j.properties
+++ b/tests/kafkatest/services/kafka/templates/log4j.properties
@@ -136,3 +136,6 @@ log4j.additivity.state.change.logger=false
 log4j.logger.kafka.authorizer.logger={{ log_level|default("DEBUG") }}, authorizerInfoAppender, authorizerDebugAppender
 log4j.additivity.kafka.authorizer.logger=false
 
+#New Group Coordinator logging.
+log4j.logger.org.apache.kafka.coordinator.group={{ log_level|default("DEBUG") }}, kafkaInfoAppender, kafkaDebugAppender
+log4j.additivity.org.apache.kafka.coordinator.group=false
diff --git a/tests/kafkatest/tests/core/consume_bench_test.py b/tests/kafkatest/tests/core/consume_bench_test.py
index ce08d80832a..07633c55330 100644
--- a/tests/kafkatest/tests/core/consume_bench_test.py
+++ b/tests/kafkatest/tests/core/consume_bench_test.py
@@ -68,9 +68,23 @@ class ConsumeBenchTest(Test):
         self.logger.debug("Produce workload finished")
 
     @cluster(num_nodes=10)
-    @matrix(topics=[["consume_bench_topic[0-5]"]], metadata_quorum=quorum.all_non_upgrade) # topic subscription
-    @matrix(topics=[["consume_bench_topic[0-5]:[0-4]"]], metadata_quorum=quorum.all_non_upgrade)  # manual topic assignment
-    def test_consume_bench(self, topics, metadata_quorum=quorum.zk):
+    @matrix(
+        topics=[
+            ["consume_bench_topic[0-5]"], # topic subscription
+            ["consume_bench_topic[0-5]:[0-4]"] # manual topic assignment
+        ],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        topics=[
+            ["consume_bench_topic[0-5]"], # topic subscription
+            ["consume_bench_topic[0-5]:[0-4]"] # manual topic assignment
+        ],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_consume_bench(self, topics, metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Runs a ConsumeBench workload to consume messages
         """
@@ -91,8 +105,15 @@ class ConsumeBenchTest(Test):
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
     @cluster(num_nodes=10)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_single_partition(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_single_partition(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Run a ConsumeBench against a single partition
         """
@@ -114,8 +135,15 @@ class ConsumeBenchTest(Test):
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
     @cluster(num_nodes=10)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Runs multiple consumers group to read messages from topics.
         Since a consumerGroup isn't specified, each consumer should read from all topics independently
@@ -138,8 +166,15 @@ class ConsumeBenchTest(Test):
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
     @cluster(num_nodes=10)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Runs two consumers in the same consumer group to read messages from topics.
         Since a consumerGroup is specified, each consumer should dynamically get assigned a partition from group
@@ -163,8 +198,15 @@ class ConsumeBenchTest(Test):
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
     @cluster(num_nodes=10)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Runs multiple consumers in to read messages from specific partitions.
         Since a consumerGroup isn't specified, each consumer will get assigned a random group
@@ -188,8 +230,15 @@ class ConsumeBenchTest(Test):
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
     @cluster(num_nodes=10)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Runs multiple consumers in the same group to read messages from specific partitions.
         It is an invalid configuration to provide a consumer group and specific partitions.