You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/09/14 23:10:15 UTC

[kafka] branch 3.3 updated: MINOR: Adds KRaft versions of most streams system tests (#12458)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 2b53b609586 MINOR: Adds KRaft versions of most streams system tests (#12458)
2b53b609586 is described below

commit 2b53b609586f968104a1b516c0f129b86827ef9e
Author: Alan Sheinberg <57...@users.noreply.github.com>
AuthorDate: Fri Aug 26 14:11:19 2022 -0700

    MINOR: Adds KRaft versions of most streams system tests (#12458)
    
    Migrates Streams sustem tests to either use kraft brokers or to use both kraft and zk in a testing matrix.
    
    This skips tests which use various forms of Kafka versioning since those seem to have issues with KRaft at the moment. Running these tests with KRaft will require a followup PR.
    
    Reviewers: Guozhang Wang <gu...@apache.org>, John Roesler <vv...@apache.org>
---
 .../tests/streams/streams_broker_bounce_test.py    | 39 +++++++++++++++-------
 .../streams/streams_broker_down_resilience_test.py | 17 +++++++---
 tests/kafkatest/tests/streams/streams_eos_test.py  | 27 +++++++--------
 .../streams_named_repartition_topic_test.py        | 20 +++++++----
 .../tests/streams/streams_optimized_test.py        | 20 +++++++----
 .../tests/streams/streams_relational_smoke_test.py |  6 ++--
 .../streams/streams_shutdown_deadlock_test.py      |  5 ++-
 .../kafkatest/tests/streams/streams_smoke_test.py  |  5 +--
 .../tests/streams/streams_standby_replica_test.py  |  5 ++-
 .../streams/streams_static_membership_test.py      | 20 +++++++----
 10 files changed, 110 insertions(+), 54 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index d8992808127..c5fdfcf8743 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -18,7 +18,7 @@ from ducktape.tests.test import Test
 from ducktape.mark.resource import cluster
 from ducktape.mark import matrix
 from ducktape.mark import ignore
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
 import time
@@ -67,7 +67,8 @@ def hard_bounce(test, topic, broker_type):
         # Since this is a hard kill, we need to make sure the process is down and that
         # zookeeper has registered the loss by expiring the broker's session timeout.
 
-        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
+        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and
+                           not (quorum.for_test(test.test_context) == quorum.zk and test.kafka.is_registered(prev_broker_node)),
                    timeout_sec=test.kafka.zk_session_timeout + 5,
                    err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
 
@@ -151,10 +152,16 @@ class StreamsBrokerBounceTest(Test):
         
     def setup_system(self, start_processor=True, num_threads=3):
         # Setup phase
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=self.zk, topics=self.topics)
+        self.zk = (
+            ZookeeperService(self.test_context, 1)
+            if quorum.for_test(self.test_context) == quorum.zk
+            else None
+        )
+        if self.zk:
+            self.zk.start()
+
+        self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=self.zk, topics=self.topics,
+                                  controller_num_nodes_override=1)
         self.kafka.start()
 
         # allow some time for topics to be created
@@ -205,11 +212,17 @@ class StreamsBrokerBounceTest(Test):
         return data
 
     @cluster(num_nodes=7)
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+            broker_type=["leader"],
+            num_threads=[1, 3],
+            sleep_time_secs=[120],
+            metadata_quorum=[quorum.remote_kraft])
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["leader", "controller"],
             num_threads=[1, 3],
-            sleep_time_secs=[120])
-    def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads):
+            sleep_time_secs=[120],
+            metadata_quorum=[quorum.zk])
+    def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum):
         """
         Start a smoke test client, then kill one particular broker and ensure data is still received
         Record if records are delivered.
@@ -251,8 +264,9 @@ class StreamsBrokerBounceTest(Test):
 
     @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            num_failures=[2])
-    def test_many_brokers_bounce(self, failure_mode, num_failures):
+            num_failures=[2],
+            metadata_quorum=quorum.all_non_upgrade)
+    def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum=quorum.zk):
         """
         Start a smoke test client, then kill a few brokers and ensure data is still received
         Record if records are delivered
@@ -269,8 +283,9 @@ class StreamsBrokerBounceTest(Test):
 
     @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_bounce", "hard_bounce"],
-            num_failures=[3])
-    def test_all_brokers_bounce(self, failure_mode, num_failures):
+            num_failures=[3],
+            metadata_quorum=quorum.all_non_upgrade)
+    def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum=quorum.zk):
         """
         Start a smoke test client, then kill a few brokers and ensure data is still received
         Record if records are delivered
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 6957d699617..182c1e789b9 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -14,7 +14,9 @@
 # limitations under the License.
 
 import time
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
+from kafkatest.services.kafka import quorum
 from kafkatest.services.streams import StreamsBrokerDownResilienceService
 from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
 
@@ -39,10 +41,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
                                                           num_brokers=1)
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     @cluster(num_nodes=7)
-    def test_streams_resilient_to_broker_down(self):
+    @matrix(metadata_quorum=[quorum.remote_kraft])
+    def test_streams_resilient_to_broker_down(self, metadata_quorum):
         self.kafka.start()
 
         # Broker should be down over 2x of retries * timeout ms
@@ -78,7 +82,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.kafka.stop()
 
     @cluster(num_nodes=7)
-    def test_streams_runs_with_broker_down_initially(self):
+    @matrix(metadata_quorum=[quorum.remote_kraft])
+    def test_streams_runs_with_broker_down_initially(self, metadata_quorum):
         self.kafka.start()
         node = self.kafka.leader(self.inputTopic)
         self.kafka.stop_node(node)
@@ -145,7 +150,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.kafka.stop()
 
     @cluster(num_nodes=9)
-    def test_streams_should_scale_in_while_brokers_down(self):
+    @matrix(metadata_quorum=[quorum.remote_kraft])
+    def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum):
         self.kafka.start()
 
         # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
@@ -223,7 +229,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.kafka.stop()
 
     @cluster(num_nodes=9)
-    def test_streams_should_failover_while_brokers_down(self):
+    @matrix(metadata_quorum=[quorum.remote_kraft])
+    def test_streams_should_failover_while_brokers_down(self, metadata_quorum):
         self.kafka.start()
 
         # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 618d378ade1..8d8853ac461 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.kafka import quorum
 from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
     StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
 
@@ -38,18 +39,18 @@ class StreamsEosTest(KafkaTest):
         self.test_context = test_context
 
     @cluster(num_nodes=9)
-    @parametrize(processing_guarantee="exactly_once")
-    @parametrize(processing_guarantee="exactly_once_v2")
-    def test_rebalance_simple(self, processing_guarantee):
+    @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
+            metadata_quorum=[quorum.remote_kraft])
+    def test_rebalance_simple(self, processing_guarantee, metadata_quorum):
         self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                            StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                            StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                            StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
 
     @cluster(num_nodes=9)
-    @parametrize(processing_guarantee="exactly_once")
-    @parametrize(processing_guarantee="exactly_once_v2")
-    def test_rebalance_complex(self, processing_guarantee):
+    @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
+            metadata_quorum=[quorum.remote_kraft])
+    def test_rebalance_complex(self, processing_guarantee, metadata_quorum):
         self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                            StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                            StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@@ -82,18 +83,18 @@ class StreamsEosTest(KafkaTest):
         verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
 
     @cluster(num_nodes=9)
-    @parametrize(processing_guarantee="exactly_once")
-    @parametrize(processing_guarantee="exactly_once_v2")
-    def test_failure_and_recovery(self, processing_guarantee):
+    @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
+            metadata_quorum=[quorum.remote_kraft])
+    def test_failure_and_recovery(self, processing_guarantee, metadata_quorum):
         self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                                       StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                                       StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                                       StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
 
     @cluster(num_nodes=9)
-    @parametrize(processing_guarantee="exactly_once")
-    @parametrize(processing_guarantee="exactly_once_v2")
-    def test_failure_and_recovery_complex(self, processing_guarantee):
+    @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
+            metadata_quorum=[quorum.remote_kraft])
+    def test_failure_and_recovery_complex(self, processing_guarantee, metadata_quorum):
         self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                                       StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                                       StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
index ce270d303a6..423b01c41e6 100644
--- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
+++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.streams import StreamsNamedRepartitionTopicService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
@@ -40,9 +41,13 @@ class StreamsNamedRepartitionTopicTest(Test):
             self.aggregation_topic: {'partitions': 6}
         }
 
-        self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+        self.zookeeper = (
+            ZookeeperService(self.test_context, 1)
+            if quorum.for_test(self.test_context) == quorum.zk
+            else None
+        )
         self.kafka = KafkaService(self.test_context, num_nodes=3,
-                                  zk=self.zookeeper, topics=self.topics)
+                                  zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
 
         self.producer = VerifiableProducer(self.test_context,
                                            1,
@@ -52,8 +57,10 @@ class StreamsNamedRepartitionTopicTest(Test):
                                            acks=1)
 
     @cluster(num_nodes=8)
-    def test_upgrade_topology_with_named_repartition_topic(self):
-        self.zookeeper.start()
+    @matrix(metadata_quorum=[quorum.remote_kraft])
+    def test_upgrade_topology_with_named_repartition_topic(self, metadata_quorum):
+        if self.zookeeper:
+            self.zookeeper.start()
         self.kafka.start()
 
         processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
@@ -84,7 +91,8 @@ class StreamsNamedRepartitionTopicTest(Test):
 
         self.producer.stop()
         self.kafka.stop()
-        self.zookeeper.stop()
+        if self.zookeeper:
+            self.zookeeper.stop()
 
     def verify_processing(self, processors):
         for processor in processors:
diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py
index b96ec10d6ba..56506841896 100644
--- a/tests/kafkatest/tests/streams/streams_optimized_test.py
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -13,10 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
 from kafkatest.services.streams import StreamsResetter
 from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -45,9 +46,13 @@ class StreamsOptimizedTest(Test):
             self.join_topic: {'partitions': 6}
         }
 
-        self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+        self.zookeeper = (
+            ZookeeperService(self.test_context, 1)
+            if quorum.for_test(self.test_context) == quorum.zk
+            else None
+        )
         self.kafka = KafkaService(self.test_context, num_nodes=3,
-                                  zk=self.zookeeper, topics=self.topics)
+                                  zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
 
         self.producer = VerifiableProducer(self.test_context,
                                            1,
@@ -57,8 +62,10 @@ class StreamsOptimizedTest(Test):
                                            acks=1)
 
     @cluster(num_nodes=9)
-    def test_upgrade_optimized_topology(self):
-        self.zookeeper.start()
+    @matrix(metadata_quorum=[quorum.remote_kraft])
+    def test_upgrade_optimized_topology(self, metadata_quorum):
+        if self.zookeeper:
+            self.zookeeper.start()
         self.kafka.start()
 
         processor1 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka)
@@ -104,7 +111,8 @@ class StreamsOptimizedTest(Test):
         self.logger.info("teardown")
         self.producer.stop()
         self.kafka.stop()
-        self.zookeeper.stop()
+        if self.zookeeper:
+            self.zookeeper.stop()
 
     def reset_application(self):
         resetter = StreamsResetter(self.test_context, self.kafka, topic = self.input_topic, applicationId = 'StreamsOptimizedTest')
diff --git a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
index fe10a29369e..b91ec0c3e60 100644
--- a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
@@ -17,6 +17,7 @@
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import quorum
 from kafkatest.services.streams import StreamsTestBaseService
 from kafkatest.tests.kafka_test import KafkaTest
 
@@ -85,8 +86,9 @@ class StreamsRelationalSmokeTest(KafkaTest):
 
     @cluster(num_nodes=8)
     @matrix(crash=[False, True],
-            processing_guarantee=['exactly_once', 'exactly_once_v2'])
-    def test_streams(self, crash, processing_guarantee):
+            processing_guarantee=['exactly_once', 'exactly_once_v2'],
+            metadata_quorum=[quorum.remote_kraft])
+    def test_streams(self, crash, processing_guarantee, metadata_quorum):
         driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored", "ignored")
 
         LOG_FILE = driver.LOG_FILE  # this is the same for all instances of the service, so we can just declare a "constant"
diff --git a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
index d190a1c3119..3da5b481da9 100644
--- a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
+++ b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
@@ -13,8 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.kafka import quorum
 from kafkatest.services.streams import StreamsSmokeTestShutdownDeadlockService
 
 
@@ -31,7 +33,8 @@ class StreamsShutdownDeadlockTest(KafkaTest):
         self.driver = StreamsSmokeTestShutdownDeadlockService(test_context, self.kafka)
 
     @cluster(num_nodes=3)
-    def test_shutdown_wont_deadlock(self):
+    @matrix(metadata_quorum=[quorum.remote_kraft])
+    def test_shutdown_wont_deadlock(self, metadata_quorum):
         """
         Start ShutdownDeadLockTest, wait for upt to 1 minute, and check that the process exited.
         If it hasn't exited then fail as it is deadlocked
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index 6cd2b7611a3..8a119f37d99 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -47,8 +47,9 @@ class StreamsSmokeTest(KafkaTest):
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
 
     @cluster(num_nodes=8)
-    @matrix(processing_guarantee=['at_least_once'], crash=[True, False], metadata_quorum=quorum.all_non_upgrade)
-    @matrix(processing_guarantee=['exactly_once', 'exactly_once_v2'], crash=[True, False])
+    @matrix(processing_guarantee=['exactly_once', 'exactly_once_v2', 'at_least_once'],
+            crash=[True, False],
+            metadata_quorum=quorum.all_non_upgrade)
     def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk):
         processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
         processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index a8c07513c1c..d11979385ac 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -13,8 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import quorum
 from kafkatest.services.streams import StreamsStandbyTaskService
 from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
 
@@ -45,7 +47,8 @@ class StreamsStandbyTask(BaseStreamsTest):
                                                  })
 
     @cluster(num_nodes=10)
-    def test_standby_tasks_rebalance(self):
+    @matrix(metadata_quorum=[quorum.remote_kraft])
+    def test_standby_tasks_rebalance(self, metadata_quorum):
         # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
         configs = self.get_configs(
             ",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor" % (
diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py b/tests/kafkatest/tests/streams/streams_static_membership_test.py
index 5838484be10..c2241c5e2b1 100644
--- a/tests/kafkatest/tests/streams/streams_static_membership_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.streams import StaticMemberTestService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
@@ -38,9 +39,13 @@ class StreamsStaticMembershipTest(Test):
             self.input_topic: {'partitions': 18},
         }
 
-        self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+        self.zookeeper = (
+            ZookeeperService(self.test_context, 1)
+            if quorum.for_test(self.test_context) == quorum.zk
+            else None
+        )
         self.kafka = KafkaService(self.test_context, num_nodes=3,
-                                  zk=self.zookeeper, topics=self.topics)
+                                  zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
 
         self.producer = VerifiableProducer(self.test_context,
                                            1,
@@ -50,8 +55,10 @@ class StreamsStaticMembershipTest(Test):
                                            acks=1)
 
     @cluster(num_nodes=8)
-    def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self):
-        self.zookeeper.start()
+    @matrix(metadata_quorum=[quorum.remote_kraft])
+    def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self, metadata_quorum):
+        if self.zookeeper:
+            self.zookeeper.start()
         self.kafka.start()
 
         numThreads = 3
@@ -97,7 +104,8 @@ class StreamsStaticMembershipTest(Test):
 
         self.producer.stop()
         self.kafka.stop(timeout_sec=120)
-        self.zookeeper.stop()
+        if self.zookeeper:
+            self.zookeeper.stop()
 
     def verify_processing(self, processors):
         for processor in processors: