You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by lu...@apache.org on 2024/03/22 13:57:45 UTC
(kafka) branch trunk updated: KAFKA-16274: Update replica_scale_test.py to support KIP-848’s group protocol config (#15577)
This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f66610095c6 KAFKA-16274: Update replica_scale_test.py to support KIP-848’s group protocol config (#15577)
f66610095c6 is described below
commit f66610095c6342260ea21f994db0570b1a3c5e51
Author: Kirk True <ki...@kirktrue.pro>
AuthorDate: Fri Mar 22 06:57:37 2024 -0700
KAFKA-16274: Update replica_scale_test.py to support KIP-848’s group protocol config (#15577)
Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Lucas Brutschy <lb...@confluent.io>
---
tests/kafkatest/tests/core/replica_scale_test.py | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py
index 22555d202d3..8500c0bf302 100644
--- a/tests/kafkatest/tests/core/replica_scale_test.py
+++ b/tests/kafkatest/tests/core/replica_scale_test.py
@@ -20,7 +20,7 @@ from ducktape.tests.test import Test
from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
from kafkatest.services.trogdor.task_spec import TaskSpec
-from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.trogdor.trogdor import TrogdorService
from kafkatest.services.zookeeper import ZookeeperService
@@ -52,7 +52,7 @@ class ReplicaScaleTest(Test):
topic_count=[50],
partition_count=[34],
replication_factor=[3],
- metadata_quorum=[quorum.zk],
+ metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
@@ -60,10 +60,11 @@ class ReplicaScaleTest(Test):
partition_count=[34],
replication_factor=[3],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True, False]
+ use_new_coordinator=[True],
+ group_protocol=consumer_group.all_group_protocols
)
def test_produce_consume(self, topic_count, partition_count, replication_factor,
- metadata_quorum=quorum.zk, use_new_coordinator=False):
+ metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "replicas_produce_consume_%d" % i
@@ -101,12 +102,13 @@ class ReplicaScaleTest(Test):
produce_workload.wait_for_done(timeout_sec=600)
print("Completed produce bench", flush=True) # Force some stdout for Travis
+ consumer_conf = consumer_group.maybe_set_group_protocol(group_protocol)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
consumer_workload_service.consumer_node,
consumer_workload_service.bootstrap_servers,
target_messages_per_sec=150000,
max_messages=3400000,
- consumer_conf={},
+ consumer_conf=consumer_conf,
admin_client_conf={},
common_client_conf={},
active_topics=["replicas_produce_consume_[0-2]"])