You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/22 21:58:56 UTC

[kafka] branch trunk updated: MINOR: Test the new KIP-500 quorum mode in ducktape (#10105)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0711d155 MINOR: Test the new KIP-500 quorum mode in ducktape (#10105)
0711d155 is described below

commit 0711d1558250c60807dfa815d46907eac8bb4b98
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Mon Feb 22 16:57:17 2021 -0500

    MINOR: Test the new KIP-500 quorum mode in ducktape (#10105)
    
    Add the necessary test annotations to test the new KIP-500 quorum broker mode
    in many of our ducktape tests. This mode is tested in addition to the classic
    Apache ZooKeeper mode.
    
    This PR also adds a new sanity_checks/bounce_test.py system test that runs
    through a simple produce/bounce/produce series of events.
    
    Finally, this PR adds @cluster annotations to dozens of system tests that were
    missing them. The lack of this annotation was causing these tests to grab the
    entire cluster of nodes.  Adding the @cluster annotation dramatically reduced
    the time needed to run these tests.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>
---
 tests/kafkatest/sanity_checks/test_bounce.py       | 72 ++++++++++++++++++++++
 .../sanity_checks/test_console_consumer.py         | 16 ++---
 .../sanity_checks/test_performance_services.py     | 16 +++--
 .../sanity_checks/test_verifiable_producer.py      | 32 +++++++---
 tests/kafkatest/services/console_consumer.py       |  9 +--
 .../services/performance/consumer_performance.py   | 10 +--
 .../services/performance/end_to_end_latency.py     | 13 ++--
 .../services/performance/producer_performance.py   |  4 +-
 .../client/client_compatibility_features_test.py   | 19 +++---
 .../client_compatibility_produce_consume_test.py   | 15 +++--
 tests/kafkatest/tests/client/compression_test.py   | 13 ++--
 .../tests/client/consumer_rolling_upgrade_test.py  |  8 ++-
 tests/kafkatest/tests/client/consumer_test.py      | 36 ++++++-----
 .../tests/client/message_format_change_test.py     | 24 ++++----
 tests/kafkatest/tests/client/pluggable_test.py     |  7 ++-
 tests/kafkatest/tests/connect/connect_test.py      | 16 ++---
 .../core/compatibility_test_new_broker_test.py     | 61 +++++++++---------
 tests/kafkatest/tests/core/consume_bench_test.py   | 40 ++++++++----
 .../tests/core/consumer_group_command_test.py      | 18 +++---
 .../kafkatest/tests/core/delegation_token_test.py  |  2 +
 tests/kafkatest/tests/core/downgrade_test.py       |  2 +-
 .../tests/core/fetch_from_follower_test.py         | 14 +++--
 .../kafkatest/tests/core/get_offset_shell_test.py  | 23 ++++---
 .../tests/core/group_mode_transactions_test.py     | 29 ++++++---
 tests/kafkatest/tests/core/produce_bench_test.py   | 21 ++++---
 tests/kafkatest/tests/core/replica_scale_test.py   | 22 ++++---
 tests/kafkatest/tests/core/replication_test.py     | 50 ++++++++++-----
 .../kafkatest/tests/core/round_trip_fault_test.py  | 54 ++++++++++++----
 tests/kafkatest/tests/core/security_test.py        | 14 +++--
 tests/kafkatest/tests/core/transactions_test.py    | 31 +++++++---
 tests/kafkatest/tests/core/upgrade_test.py         |  4 +-
 tests/kafkatest/tests/end_to_end.py                |  6 +-
 tests/kafkatest/tests/kafka_test.py                | 10 +--
 .../streams/streams_application_upgrade_test.py    |  2 +
 .../streams/streams_broker_compatibility_test.py   |  5 ++
 .../streams/streams_broker_down_resilience_test.py |  5 ++
 .../streams_cooperative_rebalance_upgrade_test.py  |  2 +
 .../tests/streams/streams_optimized_test.py        |  3 +-
 .../streams/streams_shutdown_deadlock_test.py      |  2 +
 .../kafkatest/tests/streams/streams_smoke_test.py  |  5 +-
 .../tests/streams/streams_standby_replica_test.py  |  2 +
 .../streams/streams_static_membership_test.py      |  2 +
 .../tests/streams/streams_upgrade_test.py          |  2 +
 tests/kafkatest/tests/tools/log4j_appender_test.py | 16 ++---
 tests/kafkatest/tests/tools/log_compaction_test.py | 14 +++--
 .../tests/tools/replica_verification_test.py       | 16 +++--
 tests/kafkatest/version.py                         | 12 ++++
 47 files changed, 538 insertions(+), 261 deletions(-)

diff --git a/tests/kafkatest/sanity_checks/test_bounce.py b/tests/kafkatest/sanity_checks/test_bounce.py
new file mode 100644
index 0000000..c01f23b
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_bounce.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class TestBounce(Test):
+    """Sanity checks on verifiable producer service class with cluster roll."""
+    def __init__(self, test_context):
+        super(TestBounce, self).__init__(test_context)
+
+        self.topic = "topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}},
+                                  controller_num_nodes_override=3 if quorum.for_test(test_context) == quorum.remote_raft else 1)
+        self.num_messages = 1000
+
+    def create_producer(self):
+        # This will produce to source kafka cluster
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
+                                           max_messages=self.num_messages, throughput=self.num_messages // 10)
+    def setUp(self):
+        if self.zk:
+            self.zk.start()
+
+    @cluster(num_nodes=6)
+    @parametrize(metadata_quorum=quorum.remote_raft)
+    @cluster(num_nodes=4)
+    @parametrize(metadata_quorum=quorum.colocated_raft)
+    @cluster(num_nodes=4)
+    @parametrize(metadata_quorum=quorum.zk)
+    def test_simple_run(self, metadata_quorum):
+        """
+        Test that we can start VerifiableProducer on the current branch snapshot version, and
+        verify that we can produce a small number of messages both before and after a subsequent roll.
+        """
+        self.kafka.start()
+        for first_time in [True, False]:
+            self.create_producer()
+            self.producer.start()
+            wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
+                       err_msg="Producer failed to start in a reasonable amount of time.")
+
+            self.producer.wait()
+            num_produced = self.producer.num_acked
+            assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
+            if first_time:
+                self.producer.stop()
+                if self.kafka.quorum_info.using_raft and self.kafka.remote_controller_quorum:
+                    self.kafka.remote_controller_quorum.restart_cluster()
+                self.kafka.restart_cluster()
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 686cd42..0847ce0 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -21,7 +21,7 @@ from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 
 from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.utils.remote_account import line_count, file_exists
@@ -34,20 +34,22 @@ class ConsoleConsumerTest(Test):
         super(ConsoleConsumerTest, self).__init__(test_context)
 
         self.topic = "topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka",
                                   topics={self.topic: {"partitions": 1, "replication-factor": 1}})
         self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     @cluster(num_nodes=3)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_raft)
     @cluster(num_nodes=4)
-    @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'])
-    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
-    def test_lifecycle(self, security_protocol, sasl_mechanism='GSSAPI'):
+    @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN'], metadata_quorum=quorum.all_raft)
+    @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['SCRAM-SHA-256', 'SCRAM-SHA-512']) # SCRAM not yet supported with Raft
+    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], metadata_quorum=quorum.all_raft)
+    def test_lifecycle(self, security_protocol, sasl_mechanism='GSSAPI', metadata_quorum=quorum.zk):
         """Check that console consumer starts/stops properly, and that we are capturing log output."""
 
         self.kafka.security_protocol = security_protocol
diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py
index 280152c..f0d1a48 100644
--- a/tests/kafkatest/sanity_checks/test_performance_services.py
+++ b/tests/kafkatest/sanity_checks/test_performance_services.py
@@ -13,11 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix, parametrize
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
 from kafkatest.services.performance import latency, compute_aggregate_throughput
 from kafkatest.services.zookeeper import ZookeeperService
@@ -31,10 +31,11 @@ class PerformanceServiceTest(Test):
         self.num_records = 10000
         self.topic = "topic"
 
-        self.zk = ZookeeperService(test_context, 1)
+        self.zk = ZookeeperService(test_context, 1) if quorum.for_test(test_context) == quorum.zk else None
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     @cluster(num_nodes=5)
     # We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
@@ -43,8 +44,9 @@ class PerformanceServiceTest(Test):
     @parametrize(version=str(LATEST_0_9), new_consumer=False)
     @parametrize(version=str(LATEST_0_9))
     @parametrize(version=str(LATEST_1_1), new_consumer=False)
-    @parametrize(version=str(DEV_BRANCH))
-    def test_version(self, version=str(LATEST_0_9), new_consumer=True):
+    @cluster(num_nodes=5)
+    @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all)
+    def test_version(self, version=str(LATEST_0_9), new_consumer=True, metadata_quorum=quorum.zk):
         """
         Sanity check out producer performance service - verify that we can run the service with a small
         number of messages. The actual stats here are pretty meaningless since the number of messages is quite small.
@@ -67,6 +69,7 @@ class PerformanceServiceTest(Test):
                 'buffer.memory': 64*1024*1024})
         self.producer_perf.run()
         producer_perf_data = compute_aggregate_throughput(self.producer_perf)
+        assert producer_perf_data['records_per_sec'] > 0
 
         # check basic run of end to end latency
         self.end_to_end = EndToEndLatencyService(
@@ -82,6 +85,7 @@ class PerformanceServiceTest(Test):
         self.consumer_perf.group = "test-consumer-group"
         self.consumer_perf.run()
         consumer_perf_data = compute_aggregate_throughput(self.consumer_perf)
+        assert consumer_perf_data['records_per_sec'] > 0
 
         return {
             "producer_performance": producer_perf_data,
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 5a95e48..32961f1 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -14,12 +14,12 @@
 # limitations under the License.
 
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix, parametrize
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.utils import is_version
@@ -32,7 +32,7 @@ class TestVerifiableProducer(Test):
         super(TestVerifiableProducer, self).__init__(test_context)
 
         self.topic = "topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
                                   topics={self.topic: {"partitions": 1, "replication-factor": 1}})
 
@@ -41,24 +41,40 @@ class TestVerifiableProducer(Test):
         self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
                                            max_messages=self.num_messages, throughput=self.num_messages // 10)
     def setUp(self):
-        self.zk.start()
-        self.kafka.start()
+        if self.zk:
+            self.zk.start()
 
     @cluster(num_nodes=3)
     @parametrize(producer_version=str(LATEST_0_8_2))
     @parametrize(producer_version=str(LATEST_0_9))
     @parametrize(producer_version=str(LATEST_0_10_0))
     @parametrize(producer_version=str(LATEST_0_10_1))
-    @parametrize(producer_version=str(DEV_BRANCH))
-    def test_simple_run(self, producer_version=DEV_BRANCH):
+    @matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all)
+    @cluster(num_nodes=4)
+    @matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'GSSAPI'],
+            metadata_quorum=quorum.all)
+    def test_simple_run(self, producer_version, security_protocol = 'PLAINTEXT', sasl_mechanism='PLAIN',
+                        metadata_quorum=quorum.zk):
         """
         Test that we can start VerifiableProducer on the current branch snapshot version or against the 0.8.2 jar, and
         verify that we can produce a small number of messages.
         """
+        self.kafka.security_protocol = security_protocol
+        self.kafka.client_sasl_mechanism = sasl_mechanism
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.interbroker_sasl_mechanism = sasl_mechanism
+        if self.kafka.quorum_info.using_raft:
+            controller_quorum = self.kafka.controller_quorum
+            controller_quorum.controller_security_protocol = security_protocol
+            controller_quorum.controller_sasl_mechanism = sasl_mechanism
+            controller_quorum.intercontroller_security_protocol = security_protocol
+            controller_quorum.intercontroller_sasl_mechanism = sasl_mechanism
+        self.kafka.start()
+
         node = self.producer.nodes[0]
         node.version = KafkaVersion(producer_version)
         self.producer.start()
-        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
              err_msg="Producer failed to start in a reasonable amount of time.")
 
         # using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 14b450c..32e7145 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
 from kafkatest.services.kafka.util import fix_opts_for_new_jvm
 
 """
@@ -151,7 +151,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
     def start_cmd(self, node):
         """Return the start command appropriate for the given node."""
         args = self.args.copy()
-        args['zk_connect'] = self.kafka.zk_connect_setting()
+        args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
+        if not self.new_consumer:
+            args['zk_connect'] = self.kafka.zk_connect_setting()
         args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
         args['log_dir'] = ConsoleConsumer.LOG_DIR
@@ -160,7 +162,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['jmx_port'] = self.jmx_port
         args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node)
-        args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
 
         if self.kafka_opts_override:
             args['kafka_opts'] = "\"%s\"" % self.kafka_opts_override
@@ -177,7 +178,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
               "--consumer.config %(config_file)s " % args
 
         if self.new_consumer:
-            assert node.version >= V_0_9_0_0, \
+            assert node.version.consumer_supports_bootstrap_server(), \
                 "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(node.version)
             if node.version <= LATEST_0_10_0:
                 cmd += " --new-consumer"
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index 930a68f..6df8dfb 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -18,7 +18,7 @@ import os
 
 from kafkatest.services.performance import PerformanceService
 from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.version import DEV_BRANCH, V_0_9_0_0, V_2_0_0, LATEST_0_10_0
+from kafkatest.version import DEV_BRANCH, V_2_0_0, LATEST_0_10_0
 
 
 class ConsumerPerformanceService(PerformanceService):
@@ -79,14 +79,14 @@ class ConsumerPerformanceService(PerformanceService):
         self.new_consumer = new_consumer
         self.settings = settings
 
-        assert version >= V_0_9_0_0 or (not new_consumer), \
+        assert version.consumer_supports_bootstrap_server() or (not new_consumer), \
             "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version)
 
         assert version < V_2_0_0 or new_consumer, \
             "new_consumer==false is only supported if version < 2.0.0, version %s" % str(version)
 
         security_protocol = self.security_config.security_protocol
-        assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
+        assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \
             "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
 
         # These less-frequently used settings can be updated manually after instantiation
@@ -142,7 +142,7 @@ class ConsumerPerformanceService(PerformanceService):
         for key, value in self.args(node.version).items():
             cmd += " --%s %s" % (key, value)
 
-        if node.version >= V_0_9_0_0:
+        if node.version.consumer_supports_bootstrap_server():
             # This is only used for security settings
             cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
 
@@ -155,7 +155,7 @@ class ConsumerPerformanceService(PerformanceService):
 
     def parse_results(self, line, version):
         parts = line.split(',')
-        if version >= V_0_9_0_0:
+        if version.consumer_supports_bootstrap_server():
             result = {
                 'total_mb': float(parts[2]),
                 'mbps': float(parts[3]),
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 2c7f69a..3cde3ef 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -17,7 +17,7 @@ import os
 
 from kafkatest.services.performance import PerformanceService
 from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.version import DEV_BRANCH, V_0_9_0_0
+from kafkatest.version import DEV_BRANCH
 
 
 
@@ -53,7 +53,7 @@ class EndToEndLatencyService(PerformanceService):
 
         security_protocol = self.security_config.security_protocol
 
-        if version < V_0_9_0_0:
+        if not version.consumer_supports_bootstrap_server():
             assert security_protocol == SecurityConfig.PLAINTEXT, \
                 "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
             assert compression_type == "none", \
@@ -74,15 +74,18 @@ class EndToEndLatencyService(PerformanceService):
     def start_cmd(self, node):
         args = self.args.copy()
         args.update({
-            'zk_connect': self.kafka.zk_connect_setting(),
             'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
             'config_file': EndToEndLatencyService.CONFIG_FILE,
             'kafka_run_class': self.path.script("kafka-run-class.sh", node),
             'java_class_name': self.java_class_name()
         })
+        if not node.version.consumer_supports_bootstrap_server():
+            args.update({
+                'zk_connect': self.kafka.zk_connect_setting(),
+            })
 
         cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG
-        if node.version >= V_0_9_0_0:
+        if node.version.consumer_supports_bootstrap_server():
             cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args
             cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args
         else:
@@ -102,7 +105,7 @@ class EndToEndLatencyService(PerformanceService):
 
         node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG, log_config)
         client_config = str(self.security_config)
-        if node.version >= V_0_9_0_0:
+        if node.version.consumer_supports_bootstrap_server():
             client_config += "compression_type=%(compression_type)s" % self.args
         node.account.create_file(EndToEndLatencyService.CONFIG_FILE, client_config)
 
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 3c43698..a990d4f 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -22,7 +22,7 @@ from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDAN
 from kafkatest.services.monitor.http import HttpMetricsCollector
 from kafkatest.services.performance import PerformanceService
 from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.version import DEV_BRANCH, V_0_9_0_0
+from kafkatest.version import DEV_BRANCH
 
 
 class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
@@ -55,7 +55,7 @@ class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
         self.security_config = kafka.security_config.client_config()
 
         security_protocol = self.security_config.security_protocol
-        assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
+        assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \
             "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
 
         self.args = {
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index 4e7aeed..d98dffa 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -19,11 +19,12 @@ import errno
 import time
 from random import randint
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix, parametrize
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import TestContext
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from ducktape.tests.test import Test
 from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, V_0_11_0_0, V_0_10_1_0, KafkaVersion
 
@@ -69,7 +70,7 @@ class ClientCompatibilityFeaturesTest(Test):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(ClientCompatibilityFeaturesTest, self).__init__(test_context=test_context)
 
-        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None
 
         # Generate a unique topic name
         topic_name = "client_compat_features_topic_%d%d" % (int(time.time()), randint(0, 2147483647))
@@ -81,11 +82,11 @@ class ClientCompatibilityFeaturesTest(Test):
 
     def invoke_compatibility_program(self, features):
         # Run the compatibility test on the first Kafka node.
-        node = self.zk.nodes[0]
+        node = self.kafka.nodes[0]
         cmd = ("%s org.apache.kafka.tools.ClientCompatibilityTest "
                "--bootstrap-server %s "
                "--num-cluster-nodes %d "
-               "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node),
+               "--topic %s " % (self.kafka.path.script("kafka-run-class.sh", node),
                                self.kafka.bootstrap_servers(),
                                len(self.kafka.nodes),
                                list(self.topics.keys())[0]))
@@ -107,7 +108,8 @@ class ClientCompatibilityFeaturesTest(Test):
           self.logger.info("** Command failed.  See %s for log messages." % ssh_log_file)
           raise
 
-    @parametrize(broker_version=str(DEV_BRANCH))
+    @cluster(num_nodes=7)
+    @matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade)
     @parametrize(broker_version=str(LATEST_0_10_0))
     @parametrize(broker_version=str(LATEST_0_10_1))
     @parametrize(broker_version=str(LATEST_0_10_2))
@@ -122,8 +124,9 @@ class ClientCompatibilityFeaturesTest(Test):
     @parametrize(broker_version=str(LATEST_2_5))
     @parametrize(broker_version=str(LATEST_2_6))
     @parametrize(broker_version=str(LATEST_2_7))
-    def run_compatibility_test(self, broker_version):
-        self.zk.start()
+    def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk):
+        if self.zk:
+            self.zk.start()
         self.kafka.set_version(KafkaVersion(broker_version))
         self.kafka.start()
         features = get_broker_features(broker_version)
diff --git a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
index 52d41f5..317d0dd 100644
--- a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
@@ -13,11 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix, parametrize
+from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
@@ -34,7 +35,7 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
         super(ClientCompatibilityProduceConsumeTest, self).__init__(test_context=test_context)
 
         self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic:{
                                                                     "partitions": 10,
                                                                     "replication-factor": 2}})
@@ -46,13 +47,15 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
         self.num_consumers = 1
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     def min_cluster_size(self):
         # Override this since we're adding services outside of the constructor
         return super(ClientCompatibilityProduceConsumeTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
-    @parametrize(broker_version=str(DEV_BRANCH))
+    @cluster(num_nodes=9)
+    @matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade)
     @parametrize(broker_version=str(LATEST_0_10_0))
     @parametrize(broker_version=str(LATEST_0_10_1))
     @parametrize(broker_version=str(LATEST_0_10_2))
@@ -67,7 +70,7 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
     @parametrize(broker_version=str(LATEST_2_5))
     @parametrize(broker_version=str(LATEST_2_6))
     @parametrize(broker_version=str(LATEST_2_7))
-    def test_produce_consume(self, broker_version):
+    def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk):
         print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True)
         self.kafka.set_version(KafkaVersion(broker_version))
         self.kafka.security_protocol = "PLAINTEXT"
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
index 23b30ea..37ce52d 100644
--- a/tests/kafkatest/tests/client/compression_test.py
+++ b/tests/kafkatest/tests/client/compression_test.py
@@ -13,12 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
 from ducktape.utils.util import wait_until
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
@@ -36,7 +36,7 @@ class CompressionTest(ProduceConsumeValidateTest):
         super(CompressionTest, self).__init__(test_context=test_context)
 
         self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
                                                                     "partitions": 10,
                                                                     "replication-factor": 1}})
@@ -48,15 +48,16 @@ class CompressionTest(ProduceConsumeValidateTest):
         self.num_consumers = 1
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     def min_cluster_size(self):
         # Override this since we're adding services outside of the constructor
         return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
     @cluster(num_nodes=8)
-    @parametrize(compression_types=COMPRESSION_TYPES)
-    def test_compressed_topic(self, compression_types):
+    @matrix(compression_types=[COMPRESSION_TYPES], metadata_quorum=quorum.all_non_upgrade)
+    def test_compressed_topic(self, compression_types, metadata_quorum=quorum.zk):
         """Test produce => consume => validate for compressed topics
         Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
 
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
index 638a3fc..5beacf2 100644
--- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -13,11 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
 
 from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.kafka import TopicPartition, quorum
 
 class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
     TOPIC = "test_topic"
@@ -47,7 +48,8 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
             "Mismatched assignment: %s" % assignment
 
     @cluster(num_nodes=4)
-    def rolling_update_test(self):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def rolling_update_test(self, metadata_quorum=quorum.zk):
         """
         Verify rolling updates of partition assignment strategies works correctly. In this
         test, we use a rolling restart to change the group's assignment strategy from "range" 
@@ -70,7 +72,7 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
         consumer.start_node(consumer.nodes[0])
         self.await_all_members(consumer)
         self._verify_range_assignment(consumer)
-        
+
         # now restart the other node and verify that we have switched to round-robin
         consumer.stop_node(consumer.nodes[1])
         consumer.start_node(consumer.nodes[1])
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index 4a9e89d..f417480 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -18,7 +18,7 @@ from ducktape.utils.util import wait_until
 from ducktape.mark.resource import cluster
 
 from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.kafka import TopicPartition, quorum
 
 import signal
 
@@ -75,7 +75,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
         return consumer
 
     @cluster(num_nodes=7)
-    def test_broker_rolling_bounce(self):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk):
         """
         Verify correct consumer behavior when the brokers are consecutively restarted.
 
@@ -117,8 +118,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
             (consumer.total_consumed(), consumer.current_position(partition))
 
     @cluster(num_nodes=7)
-    @matrix(clean_shutdown=[True], bounce_mode=["all", "rolling"])
-    def test_consumer_bounce(self, clean_shutdown, bounce_mode):
+    @matrix(clean_shutdown=[True], bounce_mode=["all", "rolling"], metadata_quorum=quorum.all_non_upgrade)
+    def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk):
         """
         Verify correct consumer behavior when the consumers in the group are consecutively restarted.
 
@@ -160,8 +161,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 (consumer.current_position(partition), consumer.total_consumed())
 
     @cluster(num_nodes=7)
-    @matrix(clean_shutdown=[True], static_membership=[True, False], bounce_mode=["all", "rolling"], num_bounces=[5])
-    def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_mode, num_bounces):
+    @matrix(clean_shutdown=[True], static_membership=[True, False], bounce_mode=["all", "rolling"], num_bounces=[5], metadata_quorum=quorum.all_non_upgrade)
+    def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.zk):
         """
         Verify correct static consumer behavior when the consumers in the group are restarted. In order to make
         sure the behavior of static members are different from dynamic ones, we take both static and dynamic
@@ -222,8 +223,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 (consumer.current_position(partition), consumer.total_consumed())
 
     @cluster(num_nodes=7)
-    @matrix(bounce_mode=["all", "rolling"])
-    def test_static_consumer_persisted_after_rejoin(self, bounce_mode):
+    @matrix(bounce_mode=["all", "rolling"], metadata_quorum=quorum.all_non_upgrade)
+    def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.zk):
         """
         Verify that the updated member.id(updated_member_id) caused by static member rejoin would be persisted. If not,
         after the brokers rolling bounce, the migrated group coordinator would load the stale persisted member.id and
@@ -253,8 +254,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
         self.rolling_bounce_brokers(consumer, num_bounces=1)
 
     @cluster(num_nodes=10)
-    @matrix(num_conflict_consumers=[1, 2], fencing_stage=["stable", "all"])
-    def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage):
+    @matrix(num_conflict_consumers=[1, 2], fencing_stage=["stable", "all"], metadata_quorum=quorum.all_non_upgrade)
+    def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.zk):
         """
         Verify correct static consumer behavior when there are conflicting consumers with same group.instance.id.
 
@@ -306,8 +307,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
                        )
 
     @cluster(num_nodes=7)
-    @matrix(clean_shutdown=[True], enable_autocommit=[True, False])
-    def test_consumer_failure(self, clean_shutdown, enable_autocommit):
+    @matrix(clean_shutdown=[True], enable_autocommit=[True, False], metadata_quorum=quorum.all_non_upgrade)
+    def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk):
         partition = TopicPartition(self.TOPIC, 0)
 
         consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
@@ -353,8 +354,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 (consumer.last_commit(partition), consumer.current_position(partition))
 
     @cluster(num_nodes=7)
-    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
-    def test_broker_failure(self, clean_shutdown, enable_autocommit):
+    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False], metadata_quorum=quorum.all_non_upgrade)
+    def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk):
         partition = TopicPartition(self.TOPIC, 0)
 
         consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
@@ -390,7 +391,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 (consumer.last_commit(partition), consumer.current_position(partition))
 
     @cluster(num_nodes=7)
-    def test_group_consumption(self):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_group_consumption(self, metadata_quorum=quorum.zk):
         """
         Verifies correct group rebalance behavior as consumers are started and stopped.
         In particular, this test verifies that the partition is readable after every
@@ -442,8 +444,8 @@ class AssignmentValidationTest(VerifiableConsumerTest):
     @cluster(num_nodes=6)
     @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
                                  "org.apache.kafka.clients.consumer.RoundRobinAssignor",
-                                 "org.apache.kafka.clients.consumer.StickyAssignor"])
-    def test_valid_assignment(self, assignment_strategy):
+                                 "org.apache.kafka.clients.consumer.StickyAssignor"], metadata_quorum=quorum.all_non_upgrade)
+    def test_valid_assignment(self, assignment_strategy, metadata_quorum=quorum.zk):
         """
         Verify assignment strategy correctness: each partition is assigned to exactly
         one consumer instance.
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
index 1388330..41e0f95 100644
--- a/tests/kafkatest/tests/client/message_format_change_test.py
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -12,12 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
 from ducktape.utils.util import wait_until
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
@@ -32,9 +32,10 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
 
     def setUp(self):
         self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
+        self.zk = ZookeeperService(self.test_context, num_nodes=1) if quorum.for_test(self.test_context) == quorum.zk else None
+
+        if self.zk:
+            self.zk.start()
 
         # Producer and consumer
         self.producer_throughput = 10000
@@ -58,10 +59,10 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
             err_msg="Producer did not produce all messages in reasonable amount of time"))
 
     @cluster(num_nodes=12)
-    @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH))
-    @parametrize(producer_version=str(LATEST_0_10), consumer_version=str(LATEST_0_10))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
-    def test_compatibility(self, producer_version, consumer_version):
+    @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_0_10)], consumer_version=[str(LATEST_0_10)], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], metadata_quorum=quorum.all_non_upgrade)
+    def test_compatibility(self, producer_version, consumer_version, metadata_quorum=quorum.zk):
         """ This tests performs the following checks:
         The workload is a mix of 0.9.x, 0.10.x and 0.11.x producers and consumers
         that produce to and consume from a DEV_BRANCH cluster
@@ -81,8 +82,9 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
         self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
                                                                     "partitions": 3,
                                                                     "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}})
-       
+                                                                    'configs': {"min.insync.replicas": 2}}},
+                                                                    controller_num_nodes_override=1)
+
         self.kafka.start()
         self.logger.info("First format change to 0.9.0")
         self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
diff --git a/tests/kafkatest/tests/client/pluggable_test.py b/tests/kafkatest/tests/client/pluggable_test.py
index a2599d8..36b9172 100644
--- a/tests/kafkatest/tests/client/pluggable_test.py
+++ b/tests/kafkatest/tests/client/pluggable_test.py
@@ -13,8 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
+from kafkatest.services.kafka import quorum
 from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
 
 class PluggableConsumerTest(VerifiableConsumerTest):
@@ -29,7 +32,9 @@ class PluggableConsumerTest(VerifiableConsumerTest):
                                 self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
         })
 
-    def test_start_stop(self):
+    @cluster(num_nodes=4)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_start_stop(self, metadata_quorum=quorum.zk):
         """
         Test that a pluggable VerifiableConsumer module load works
         """
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 580d9f3..1a7f6ab 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -16,12 +16,12 @@
 from ducktape.tests.test import Test
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
+from ducktape.mark import matrix, parametrize
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.errors import TimeoutError
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.connect import ConnectServiceBase, ConnectStandaloneService, ErrorTolerance
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
@@ -63,7 +63,7 @@ class ConnectStandaloneFileTest(Test):
             'test' : { 'partitions': 1, 'replication-factor': 1 }
         }
 
-        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
 
     @cluster(num_nodes=5)
     @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
@@ -71,8 +71,9 @@ class ConnectStandaloneFileTest(Test):
     @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
     @parametrize(security_protocol=SecurityConfig.PLAINTEXT)
     @cluster(num_nodes=6)
-    @parametrize(security_protocol=SecurityConfig.SASL_SSL)
-    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
+    @matrix(security_protocol=[SecurityConfig.SASL_SSL], metadata_quorum=quorum.all_non_upgrade)
+    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT',
+                                  metadata_quorum=quorum.zk):
         """
         Validates basic end-to-end functionality of Connect standalone using the file source and sink converters. Includes
         parameterizations to test different converters (which also test per-connector converter overrides), schema/schemaless
@@ -88,14 +89,15 @@ class ConnectStandaloneFileTest(Test):
 
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
                                   security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
-                                  topics=self.topics)
+                                  topics=self.topics, controller_num_nodes_override=self.num_zk)
 
         self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
         self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
         self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC_TEST,
                                                   consumer_timeout_ms=10000)
 
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
         self.kafka.start()
 
         self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")])
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index 5474112..db8aa1d 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -12,12 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix, parametrize
 from ducktape.utils.util import wait_until
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.kafka import config_property
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
@@ -33,9 +33,10 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
 
     def setUp(self):
         self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
+        self.zk = ZookeeperService(self.test_context, num_nodes=1) if quorum.for_test(self.test_context) == quorum.zk else None
+
+        if self.zk:
+            self.zk.start()
 
         # Producer and consumer
         self.producer_throughput = 10000
@@ -44,39 +45,41 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
         self.messages_per_producer = 1000
 
     @cluster(num_nodes=6)
-    @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], timestamp_type=str("LogAppendTime"))
+    @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
     @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_2_2), consumer_version=str(LATEST_2_2), compression_types=["none"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_2_3), consumer_version=str(LATEST_2_3), compression_types=["none"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_2_4), consumer_version=str(LATEST_2_4), compression_types=["none"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_2_5), consumer_version=str(LATEST_2_5), compression_types=["none"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_2_6), consumer_version=str(LATEST_2_6), compression_types=["none"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_2_7), consumer_version=str(LATEST_2_7), compression_types=["none"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_2_1), consumer_version=str(LATEST_2_1), compression_types=["zstd"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_2_0), consumer_version=str(LATEST_2_0), compression_types=["snappy"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_1_1), consumer_version=str(LATEST_1_1), compression_types=["lz4"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_1_0), consumer_version=str(LATEST_1_0), compression_types=["none"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_0_11_0), consumer_version=str(LATEST_0_11_0), compression_types=["gzip"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_0_10_2), consumer_version=str(LATEST_0_10_2), compression_types=["lz4"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(LATEST_0_10_1), consumer_version=str(LATEST_0_10_1), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+    @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_2_2)], consumer_version=[str(LATEST_2_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_2_3)], consumer_version=[str(LATEST_2_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_2_4)], consumer_version=[str(LATEST_2_4)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_2_5)], consumer_version=[str(LATEST_2_5)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_2_6)], consumer_version=[str(LATEST_2_6)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_2_7)], consumer_version=[str(LATEST_2_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_1_0)], consumer_version=[str(LATEST_1_0)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_0_11_0)], consumer_version=[str(LATEST_0_11_0)], compression_types=[["gzip"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_0_10_2)], consumer_version=[str(LATEST_0_10_2)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_0_10_1)], consumer_version=[str(LATEST_0_10_1)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_0_10_0)], consumer_version=[str(LATEST_0_10_0)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade)
+    @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
     @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
-
+    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None, metadata_quorum=quorum.zk):
+        if not new_consumer and metadata_quorum != quorum.zk:
+            raise Exception("ZooKeeper-based consumers are not supported when using a Raft-based metadata quorum")
         self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
                                                                     "partitions": 3,
                                                                     "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}})
+                                                                    'configs': {"min.insync.replicas": 2}}},
+                                  controller_num_nodes_override=1)
         for node in self.kafka.nodes:
             if timestamp_type is not None:
                 node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
         self.kafka.start()
-         
+
         self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
                                            self.topic, throughput=self.producer_throughput,
                                            message_validator=is_int,
diff --git a/tests/kafkatest/tests/core/consume_bench_test.py b/tests/kafkatest/tests/core/consume_bench_test.py
index e731270..ce08d80 100644
--- a/tests/kafkatest/tests/core/consume_bench_test.py
+++ b/tests/kafkatest/tests/core/consume_bench_test.py
@@ -14,9 +14,10 @@
 # limitations under the License.
 
 import json
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
 from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
 from kafkatest.services.trogdor.task_spec import TaskSpec
@@ -28,7 +29,7 @@ class ConsumeBenchTest(Test):
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(ConsumeBenchTest, self).__init__(test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
         self.producer_workload_service = ProduceBenchWorkloadService(test_context, self.kafka)
         self.consumer_workload_service = ConsumeBenchWorkloadService(test_context, self.kafka)
@@ -41,13 +42,15 @@ class ConsumeBenchTest(Test):
 
     def setUp(self):
         self.trogdor.start()
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
         self.kafka.start()
 
     def teardown(self):
         self.trogdor.stop()
         self.kafka.stop()
-        self.zk.stop()
+        if self.zk:
+            self.zk.stop()
 
     def produce_messages(self, topics, max_messages=10000):
         produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
@@ -64,9 +67,10 @@ class ConsumeBenchTest(Test):
         produce_workload.wait_for_done(timeout_sec=180)
         self.logger.debug("Produce workload finished")
 
-    @parametrize(topics=["consume_bench_topic[0-5]"]) # topic subscription
-    @parametrize(topics=["consume_bench_topic[0-5]:[0-4]"])  # manual topic assignment
-    def test_consume_bench(self, topics):
+    @cluster(num_nodes=10)
+    @matrix(topics=[["consume_bench_topic[0-5]"]], metadata_quorum=quorum.all_non_upgrade) # topic subscription
+    @matrix(topics=[["consume_bench_topic[0-5]:[0-4]"]], metadata_quorum=quorum.all_non_upgrade)  # manual topic assignment
+    def test_consume_bench(self, topics, metadata_quorum=quorum.zk):
         """
         Runs a ConsumeBench workload to consume messages
         """
@@ -86,7 +90,9 @@ class ConsumeBenchTest(Test):
         tasks = self.trogdor.tasks()
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
-    def test_single_partition(self):
+    @cluster(num_nodes=10)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_single_partition(self, metadata_quorum=quorum.zk):
         """
         Run a ConsumeBench against a single partition
         """
@@ -107,7 +113,9 @@ class ConsumeBenchTest(Test):
         tasks = self.trogdor.tasks()
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
-    def test_multiple_consumers_random_group_topics(self):
+    @cluster(num_nodes=10)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk):
         """
         Runs multiple consumers group to read messages from topics.
         Since a consumerGroup isn't specified, each consumer should read from all topics independently
@@ -129,7 +137,9 @@ class ConsumeBenchTest(Test):
         tasks = self.trogdor.tasks()
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
-    def test_two_consumers_specified_group_topics(self):
+    @cluster(num_nodes=10)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk):
         """
         Runs two consumers in the same consumer group to read messages from topics.
         Since a consumerGroup is specified, each consumer should dynamically get assigned a partition from group
@@ -152,7 +162,9 @@ class ConsumeBenchTest(Test):
         tasks = self.trogdor.tasks()
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
-    def test_multiple_consumers_random_group_partitions(self):
+    @cluster(num_nodes=10)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum.zk):
         """
         Runs multiple consumers in to read messages from specific partitions.
         Since a consumerGroup isn't specified, each consumer will get assigned a random group
@@ -175,7 +187,9 @@ class ConsumeBenchTest(Test):
         tasks = self.trogdor.tasks()
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
-    def test_multiple_consumers_specified_group_partitions_should_raise(self):
+    @cluster(num_nodes=10)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk):
         """
         Runs multiple consumers in the same group to read messages from specific partitions.
         It is an invalid configuration to provide a consumer group and specific partitions.
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py
index 871e276..f81eec8 100644
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ b/tests/kafkatest/tests/core/consumer_group_command_test.py
@@ -20,7 +20,7 @@ from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 
@@ -45,16 +45,18 @@ class ConsumerGroupCommandTest(Test):
         self.topics = {
             TOPIC: {'partitions': 1, 'replication-factor': 1}
         }
-        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     def start_kafka(self, security_protocol, interbroker_security_protocol):
         self.kafka = KafkaService(
             self.test_context, self.num_brokers,
             self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
+            controller_num_nodes_override=self.num_zk)
         self.kafka.start()
 
     def start_consumer(self):
@@ -88,8 +90,8 @@ class ConsumerGroupCommandTest(Test):
         self.consumer.stop()
 
     @cluster(num_nodes=3)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_non_upgrade)
+    def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
         """
         Tests if ConsumerGroupCommand is listing correct consumer groups
         :return: None
@@ -97,8 +99,8 @@ class ConsumerGroupCommandTest(Test):
         self.setup_and_verify(security_protocol)
 
     @cluster(num_nodes=3)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_non_upgrade)
+    def test_describe_consumer_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
         """
         Tests if ConsumerGroupCommand is describing a consumer group correctly
         :return: None
diff --git a/tests/kafkatest/tests/core/delegation_token_test.py b/tests/kafkatest/tests/core/delegation_token_test.py
index feb5935..5fe8d12 100644
--- a/tests/kafkatest/tests/core/delegation_token_test.py
+++ b/tests/kafkatest/tests/core/delegation_token_test.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import config_property, KafkaService
@@ -109,6 +110,7 @@ client.id=console-consumer
 
         self.delegation_tokens.renew_delegation_token(dt["hmac"], new_expirydate_ms)
 
+    @cluster(num_nodes=5)
     def test_delegation_token_lifecycle(self):
         self.kafka.start()
         self.delegation_tokens = DelegationTokens(self.kafka, self.test_context)
diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py
index beb103a..489ae7c 100644
--- a/tests/kafkatest/tests/core/downgrade_test.py
+++ b/tests/kafkatest/tests/core/downgrade_test.py
@@ -53,7 +53,7 @@ class TestDowngrade(EndToEndTest):
             self.wait_until_rejoin()
 
     def setup_services(self, kafka_version, compression_types, security_protocol, static_membership):
-        self.create_zookeeper()
+        self.create_zookeeper_if_necessary()
         self.zk.start()
 
         self.create_kafka(num_nodes=3,
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index ef37728..fab5cfa 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -16,10 +16,11 @@
 import time
 from collections import defaultdict
 
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.monitor.jmx import JmxTool
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
@@ -36,7 +37,7 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
         super(FetchFromFollowerTest, self).__init__(test_context=test_context)
         self.jmx_tool = JmxTool(test_context, jmx_poll_ms=100)
         self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context,
                                   num_nodes=3,
                                   zk=self.zk,
@@ -53,7 +54,8 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
                                       1: [("broker.rack", "rack-a")],
                                       2: [("broker.rack", "rack-b")],
                                       3: [("broker.rack", "rack-c")]
-                                  })
+                                  },
+                                  controller_num_nodes_override=1)
 
         self.producer_throughput = 1000
         self.num_producers = 1
@@ -63,11 +65,13 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
         return super(FetchFromFollowerTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
         self.kafka.start()
 
     @cluster(num_nodes=9)
-    def test_consumer_preferred_read_replica(self):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk):
         """
         This test starts up brokers with "broker.rack" and "replica.selector.class" configurations set. The replica
         selector is set to the rack-aware implementation. One of the brokers has a different rack than the other two.
diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py
index 3f226c1..b24c5ac 100644
--- a/tests/kafkatest/tests/core/get_offset_shell_test.py
+++ b/tests/kafkatest/tests/core/get_offset_shell_test.py
@@ -16,11 +16,12 @@
 
 from ducktape.utils.util import wait_until
 from ducktape.tests.test import Test
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.console_consumer import ConsoleConsumer
 
 MAX_MESSAGES = 100
@@ -63,10 +64,11 @@ class GetOffsetShellTest(Test):
             TOPIC_TEST_TOPIC_PARTITIONS2: {'partitions': 2, 'replication-factor': REPLICATION_FACTOR}
         }
 
-        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     def start_kafka(self, security_protocol, interbroker_security_protocol):
         self.kafka = KafkaService(
@@ -103,7 +105,8 @@ class GetOffsetShellTest(Test):
         return sum
 
     @cluster(num_nodes=3)
-    def test_get_offset_shell_topic_name(self, security_protocol='PLAINTEXT'):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_get_offset_shell_topic_name(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
         """
         Tests if GetOffsetShell handles --topic argument with a simple name correctly
         :return: None
@@ -116,7 +119,8 @@ class GetOffsetShellTest(Test):
                    timeout_sec=10, err_msg="Timed out waiting to reach expected offset.")
 
     @cluster(num_nodes=4)
-    def test_get_offset_shell_topic_pattern(self, security_protocol='PLAINTEXT'):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_get_offset_shell_topic_pattern(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
         """
         Tests if GetOffsetShell handles --topic argument with a pattern correctly
         :return: None
@@ -130,7 +134,8 @@ class GetOffsetShellTest(Test):
                    timeout_sec=10, err_msg="Timed out waiting to reach expected offset.")
 
     @cluster(num_nodes=3)
-    def test_get_offset_shell_partitions(self, security_protocol='PLAINTEXT'):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_get_offset_shell_partitions(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
         """
         Tests if GetOffsetShell handles --partitions argument correctly
         :return: None
@@ -151,7 +156,8 @@ class GetOffsetShellTest(Test):
                    timeout_sec=10, err_msg="Timed out waiting to reach expected offset.")
 
     @cluster(num_nodes=4)
-    def test_get_offset_shell_topic_partitions(self, security_protocol='PLAINTEXT'):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_get_offset_shell_topic_partitions(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
         """
         Tests if GetOffsetShell handles --topic-partitions argument correctly
         :return: None
@@ -202,7 +208,8 @@ class GetOffsetShellTest(Test):
         assert 0 == filtered_partitions.count("%s:%s" % (TOPIC_TEST_TOPIC_PARTITIONS2, 1))
 
     @cluster(num_nodes=4)
-    def test_get_offset_shell_internal_filter(self, security_protocol='PLAINTEXT'):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_get_offset_shell_internal_filter(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
         """
         Tests if GetOffsetShell handles --exclude-internal-topics flag correctly
         :return: None
diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py
index 141c613..e9638d5 100644
--- a/tests/kafkatest/tests/core/group_mode_transactions_test.py
+++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.transactional_message_copier import TransactionalMessageCopier
@@ -25,6 +25,7 @@ from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
+import time
 
 class GroupModeTransactionsTest(Test):
     """Essentially testing the same functionality as TransactionsTest by transactionally copying data
@@ -60,13 +61,14 @@ class GroupModeTransactionsTest(Test):
         self.progress_timeout_sec = 60
         self.consumer_group = "grouped-transactions-test-consumer-group"
 
-        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context,
                                   num_nodes=self.num_brokers,
-                                  zk=self.zk)
+                                  zk=self.zk, controller_num_nodes_override=1)
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     def seed_messages(self, topic, num_seed_messages):
         seed_timeout_sec = 10000
@@ -95,10 +97,17 @@ class GroupModeTransactionsTest(Test):
                 self.kafka.restart_node(node, clean_shutdown = True)
             else:
                 self.kafka.stop_node(node, clean_shutdown = False)
-                wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node),
-                           timeout_sec=self.kafka.zk_session_timeout + 5,
-                           err_msg="Failed to see timely deregistration of \
-                           hard-killed broker %s" % str(node.account))
+                gracePeriodSecs = 5
+                if self.zk:
+                    wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node),
+                               timeout_sec=self.kafka.zk_session_timeout + gracePeriodSecs,
+                               err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(node.account))
+                else:
+                    brokerSessionTimeoutSecs = 18
+                    wait_until(lambda: len(self.kafka.pids(node)) == 0,
+                               timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs,
+                               err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account))
+                    time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
                 self.kafka.start_node(node)
 
     def create_and_start_message_copier(self, input_topic, output_topic, transactional_id):
@@ -260,8 +269,8 @@ class GroupModeTransactionsTest(Test):
 
     @cluster(num_nodes=10)
     @matrix(failure_mode=["hard_bounce", "clean_bounce"],
-            bounce_target=["brokers", "clients"])
-    def test_transactions(self, failure_mode, bounce_target):
+            bounce_target=["brokers", "clients"], metadata_quorum=quorum.all_non_upgrade)
+    def test_transactions(self, failure_mode, bounce_target, metadata_quorum=quorum.zk):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py
index a316520..734dfb5 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -14,19 +14,20 @@
 # limitations under the License.
 
 import json
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
 from kafkatest.services.trogdor.task_spec import TaskSpec
 from kafkatest.services.trogdor.trogdor import TrogdorService
 from kafkatest.services.zookeeper import ZookeeperService
 
-
 class ProduceBenchTest(Test):
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(ProduceBenchTest, self).__init__(test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
         self.workload_service = ProduceBenchWorkloadService(test_context, self.kafka)
         self.trogdor = TrogdorService(context=self.test_context,
@@ -36,15 +37,19 @@ class ProduceBenchTest(Test):
 
     def setUp(self):
         self.trogdor.start()
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
         self.kafka.start()
 
     def teardown(self):
         self.trogdor.stop()
         self.kafka.stop()
-        self.zk.stop()
+        if self.zk:
+            self.zk.stop()
 
-    def test_produce_bench(self):
+    @cluster(num_nodes=8)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_produce_bench(self, metadata_quorum=quorum.zk):
         spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
                                         self.workload_service.producer_node,
                                         self.workload_service.bootstrap_servers,
@@ -60,7 +65,9 @@ class ProduceBenchTest(Test):
         tasks = self.trogdor.tasks()
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
 
-    def test_produce_bench_transactions(self):
+    @cluster(num_nodes=8)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_produce_bench_transactions(self, metadata_quorum=quorum.zk):
         spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
                                         self.workload_service.producer_node,
                                         self.workload_service.bootstrap_servers,
diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py
index 8541d39..f8d0c83 100644
--- a/tests/kafkatest/tests/core/replica_scale_test.py
+++ b/tests/kafkatest/tests/core/replica_scale_test.py
@@ -14,13 +14,13 @@
 # limitations under the License.
 
 from ducktape.mark.resource import cluster
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
 from ducktape.tests.test import Test
 
 from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
 from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
 from kafkatest.services.trogdor.task_spec import TaskSpec
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.trogdor.trogdor import TrogdorService
 from kafkatest.services.zookeeper import ZookeeperService
 
@@ -31,11 +31,12 @@ class ReplicaScaleTest(Test):
     def __init__(self, test_context):
         super(ReplicaScaleTest, self).__init__(test_context=test_context)
         self.test_context = test_context
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=8, zk=self.zk)
+        self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
+        self.kafka = KafkaService(self.test_context, num_nodes=8, zk=self.zk, controller_num_nodes_override=1)
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
         self.kafka.start()
 
     def teardown(self):
@@ -43,11 +44,12 @@ class ReplicaScaleTest(Test):
         for node in self.kafka.nodes:
             self.kafka.stop_node(node, clean_shutdown=False, timeout_sec=60)
         self.kafka.stop()
-        self.zk.stop()
+        if self.zk:
+            self.zk.stop()
 
     @cluster(num_nodes=12)
-    @parametrize(topic_count=50, partition_count=34, replication_factor=3)
-    def test_produce_consume(self, topic_count, partition_count, replication_factor):
+    @matrix(topic_count=[50], partition_count=[34], replication_factor=[3], metadata_quorum=quorum.all_non_upgrade)
+    def test_produce_consume(self, topic_count, partition_count, replication_factor, metadata_quorum=quorum.zk):
         topics_create_start_time = time.time()
         for i in range(topic_count):
             topic = "replicas_produce_consume_%d" % i
@@ -101,8 +103,8 @@ class ReplicaScaleTest(Test):
         trogdor.stop()
 
     @cluster(num_nodes=12)
-    @parametrize(topic_count=50, partition_count=34, replication_factor=3)
-    def test_clean_bounce(self, topic_count, partition_count, replication_factor):
+    @matrix(topic_count=[50], partition_count=[34], replication_factor=[3], metadata_quorum=quorum.all_non_upgrade)
+    def test_clean_bounce(self, topic_count, partition_count, replication_factor, metadata_quorum=quorum.zk):
         topics_create_start_time = time.time()
         for i in range(topic_count):
             topic = "topic-%04d" % i
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
index 01ef34f..a0c0156 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -19,9 +19,11 @@ from ducktape.mark import matrix
 from ducktape.mark import parametrize
 from ducktape.mark.resource import cluster
 
+from kafkatest.services.kafka import quorum
 from kafkatest.tests.end_to_end import EndToEndTest
 
 import signal
+import time
 
 def broker_node(test, broker_type):
     """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
@@ -63,10 +65,19 @@ def hard_bounce(test, broker_type):
 
         # Since this is a hard kill, we need to make sure the process is down and that
         # zookeeper has registered the loss by expiring the broker's session timeout.
-
-        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
-                   timeout_sec=test.kafka.zk_session_timeout + 5,
-                   err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
+        # Or, for a Raft-based quorum, we simply wait at least 18 seconds (the default for broker.session.timeout.ms)
+
+        gracePeriodSecs = 5
+        if test.zk:
+            wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
+                       timeout_sec=test.kafka.zk_session_timeout + gracePeriodSecs,
+                       err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
+        else:
+            brokerSessionTimeoutSecs = 18
+            wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0,
+                       timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs,
+                       err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(prev_broker_node.account))
+            time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
 
         test.kafka.start_node(prev_broker_node)
 
@@ -98,11 +109,11 @@ class ReplicationTest(EndToEndTest):
         "replication-factor": 3,
         "configs": {"min.insync.replicas": 2}
     }
- 
+
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(ReplicationTest, self).__init__(test_context=test_context, topic_config=self.TOPIC_CONFIG)
- 
+
     def min_cluster_size(self):
         """Override this since we're adding services outside of the constructor"""
         return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
@@ -111,29 +122,34 @@ class ReplicationTest(EndToEndTest):
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["leader"],
             security_protocol=["PLAINTEXT"],
-            enable_idempotence=[True])
+            enable_idempotence=[True],
+            metadata_quorum=quorum.all_non_upgrade)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["leader"],
-            security_protocol=["PLAINTEXT", "SASL_SSL"])
+            security_protocol=["PLAINTEXT", "SASL_SSL"],
+            metadata_quorum=quorum.all_non_upgrade)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["controller"],
             security_protocol=["PLAINTEXT", "SASL_SSL"])
     @matrix(failure_mode=["hard_bounce"],
             broker_type=["leader"],
-            security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"])
+            security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"],
+            metadata_quorum=quorum.all_non_upgrade)
     @parametrize(failure_mode="hard_bounce",
             broker_type="leader",
             security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512")
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"])
+            security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"],
+            metadata_quorum=quorum.all_non_upgrade)
     def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type,
                                              client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI",
-                                             compression_type=None, enable_idempotence=False, tls_version=None):
+                                             compression_type=None, enable_idempotence=False, tls_version=None,
+                                             metadata_quorum=quorum.zk):
         """Replication tests.
         These tests verify that replication provides simple durability guarantees by checking that data acked by
         brokers is still available for consumption in the face of various failure scenarios.
 
-        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
+        Setup: 1 zk/Raft-based controller, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
 
             - Produce messages in the background
             - Consume messages in the background
@@ -142,15 +158,19 @@ class ReplicationTest(EndToEndTest):
             - Validate that every acked message was consumed
         """
 
-        self.create_zookeeper()
-        self.zk.start()
+        if failure_mode == "controller" and metadata_quorum != quorum.zk:
+            raise Exception("There is no controller broker when using a Raft-based metadata quorum")
+        self.create_zookeeper_if_necessary()
+        if self.zk:
+            self.zk.start()
 
         self.create_kafka(num_nodes=3,
                           security_protocol=security_protocol,
                           interbroker_security_protocol=security_protocol,
                           client_sasl_mechanism=client_sasl_mechanism,
                           interbroker_sasl_mechanism=interbroker_sasl_mechanism,
-                          tls_version=tls_version)
+                          tls_version=tls_version,
+                          controller_num_nodes_override = 1)
         self.kafka.start()
 
         compression_types = None if not compression_type else [compression_type]
diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py
index a0ce5ae..b9085cb 100644
--- a/tests/kafkatest/tests/core/round_trip_fault_test.py
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 import time
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
 from kafkatest.services.trogdor.degraded_network_fault_spec import DegradedNetworkFaultSpec
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.trogdor.process_stop_fault_spec import ProcessStopFaultSpec
 from kafkatest.services.trogdor.round_trip_workload import RoundTripWorkloadService, RoundTripWorkloadSpec
 from kafkatest.services.trogdor.task_spec import TaskSpec
@@ -31,11 +33,17 @@ class RoundTripFaultTest(Test):
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(RoundTripFaultTest, self).__init__(test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk)
         self.workload_service = RoundTripWorkloadService(test_context, self.kafka)
+        if quorum.for_test(test_context) == quorum.zk:
+            trogdor_client_services = [self.zk, self.kafka, self.workload_service]
+        elif quorum.for_test(test_context) == quorum.remote_raft:
+            trogdor_client_services = [self.kafka.controller_quorum, self.kafka, self.workload_service]
+        else: #co-located case, which we currently don't test but handle here for completeness in case we do test it
+            trogdor_client_services = [self.kafka, self.workload_service]
         self.trogdor = TrogdorService(context=self.test_context,
-                                      client_services=[self.zk, self.kafka, self.workload_service])
+                                      client_services=trogdor_client_services)
         topic_name = "round_trip_topic%d" % RoundTripFaultTest.topic_name_index
         RoundTripFaultTest.topic_name_index = RoundTripFaultTest.topic_name_index + 1
         active_topics={topic_name : {"partitionAssignments":{"0": [0,1,2]}}}
@@ -47,24 +55,38 @@ class RoundTripFaultTest(Test):
                                      active_topics=active_topics)
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
         self.kafka.start()
         self.trogdor.start()
 
     def teardown(self):
         self.trogdor.stop()
         self.kafka.stop()
-        self.zk.stop()
+        if self.zk:
+            self.zk.stop()
 
-    def test_round_trip_workload(self):
+    def remote_quorum_nodes(self):
+        if quorum.for_test(self.test_context) == quorum.zk:
+            return self.zk.nodes
+        elif quorum.for_test(self.test_context) == quorum.remote_raft:
+            return self.kafka.controller_quorum.nodes
+        else: # co-located case, which we currently don't test but handle here for completeness in case we do test it
+            return []
+
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_round_trip_workload(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         workload1.wait_for_done(timeout_sec=600)
 
-    def test_round_trip_workload_with_broker_partition(self):
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_round_trip_workload_with_broker_partition(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
         part1 = [self.kafka.nodes[0]]
-        part2 = self.kafka.nodes[1:] + [self.workload_service.nodes[0]] + self.zk.nodes
+        part2 = self.kafka.nodes[1:] + [self.workload_service.nodes[0]] + self.remote_quorum_nodes()
         partition1_spec = NetworkPartitionFaultSpec(0, TaskSpec.MAX_DURATION_MS,
                                                     [part1, part2])
         partition1 = self.trogdor.create_task("partition1", partition1_spec)
@@ -72,7 +94,9 @@ class RoundTripFaultTest(Test):
         partition1.stop()
         partition1.wait_for_done()
 
-    def test_produce_consume_with_broker_pause(self):
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_produce_consume_with_broker_pause(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
         stop1_spec = ProcessStopFaultSpec(0, TaskSpec.MAX_DURATION_MS, [self.kafka.nodes[0]],
@@ -83,22 +107,26 @@ class RoundTripFaultTest(Test):
         stop1.wait_for_done()
         self.kafka.stop_node(self.kafka.nodes[0], False)
 
-    def test_produce_consume_with_client_partition(self):
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_produce_consume_with_client_partition(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
         part1 = [self.workload_service.nodes[0]]
-        part2 = self.kafka.nodes + self.zk.nodes
+        part2 = self.kafka.nodes + self.remote_quorum_nodes()
         partition1_spec = NetworkPartitionFaultSpec(0, 60000, [part1, part2])
         stop1 = self.trogdor.create_task("stop1", partition1_spec)
         workload1.wait_for_done(timeout_sec=600)
         stop1.stop()
         stop1.wait_for_done()
 
-    def test_produce_consume_with_latency(self):
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_produce_consume_with_latency(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
         spec = DegradedNetworkFaultSpec(0, 60000)
-        for node in self.kafka.nodes + self.zk.nodes:
+        for node in self.kafka.nodes + self.remote_quorum_nodes():
             spec.add_node_spec(node.name, "eth0", latencyMs=100, rateLimitKbit=3000)
         slow1 = self.trogdor.create_task("slow1", spec)
         workload1.wait_for_done(timeout_sec=600)
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
index 7339873..5d1d886 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -14,11 +14,12 @@
 # limitations under the License.
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 from ducktape.errors import TimeoutError
 
+from kafkatest.services.kafka import quorum
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.security.security_config import SslStores
 from kafkatest.tests.end_to_end import EndToEndTest
@@ -57,9 +58,9 @@ class SecurityTest(EndToEndTest):
         return True
 
     @cluster(num_nodes=7)
-    @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
+    @matrix(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL', metadata_quorum=quorum.all_non_upgrade)
+    @matrix(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT', metadata_quorum=quorum.all_non_upgrade)
+    def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum=quorum.zk):
         """
         Test that invalid hostname in certificate results in connection failures.
         When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
@@ -71,8 +72,9 @@ class SecurityTest(EndToEndTest):
         SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir,
                                                   valid_hostname=True)
 
-        self.create_zookeeper()
-        self.zk.start()
+        self.create_zookeeper_if_necessary()
+        if self.zk:
+            self.zk.start()
 
         self.create_kafka(security_protocol=security_protocol,
                           interbroker_security_protocol=interbroker_security_protocol)
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py
index ad8d0a7..2891e70 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.transactional_message_copier import TransactionalMessageCopier
@@ -25,6 +25,7 @@ from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
+import time
 
 class TransactionsTest(Test):
     """Tests transactions by transactionally copying data from a source topic to
@@ -58,13 +59,15 @@ class TransactionsTest(Test):
         self.progress_timeout_sec = 60
         self.consumer_group = "transactions-test-consumer-group"
 
-        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context,
                                   num_nodes=self.num_brokers,
-                                  zk=self.zk)
+                                  zk=self.zk,
+                                  controller_num_nodes_override=1)
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     def seed_messages(self, topic, num_seed_messages):
         seed_timeout_sec = 10000
@@ -92,10 +95,17 @@ class TransactionsTest(Test):
                 self.kafka.restart_node(node, clean_shutdown = True)
             else:
                 self.kafka.stop_node(node, clean_shutdown = False)
-                wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node),
-                           timeout_sec=self.kafka.zk_session_timeout + 5,
-                           err_msg="Failed to see timely deregistration of \
-                           hard-killed broker %s" % str(node.account))
+                gracePeriodSecs = 5
+                if self.zk:
+                    wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node),
+                               timeout_sec=self.kafka.zk_session_timeout + gracePeriodSecs,
+                               err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(node.account))
+                else:
+                    brokerSessionTimeoutSecs = 18
+                    wait_until(lambda: len(self.kafka.pids(node)) == 0,
+                               timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs,
+                               err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account))
+                    time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
                 self.kafka.start_node(node)
 
     def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id, use_group_metadata):
@@ -234,8 +244,9 @@ class TransactionsTest(Test):
     @matrix(failure_mode=["hard_bounce", "clean_bounce"],
             bounce_target=["brokers", "clients"],
             check_order=[True, False],
-            use_group_metadata=[True, False])
-    def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata):
+            use_group_metadata=[True, False],
+            metadata_quorum=quorum.all_non_upgrade)
+    def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
index ae0b3e7..183e490 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -24,7 +24,7 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
 from kafkatest.utils.remote_account import java_version
-from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, V_0_9_0_0, V_0_11_0_0, V_2_8_0, DEV_BRANCH, KafkaVersion
+from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, V_0_11_0_0, V_2_8_0, DEV_BRANCH, KafkaVersion
 from kafkatest.services.kafka.util import new_jdk_not_supported
 
 class TestUpgrade(ProduceConsumeValidateTest):
@@ -171,7 +171,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
         # after leader change. Tolerate limited data loss for this case to avoid transient test failures.
         self.may_truncate_acked_records = False if from_kafka_version >= V_0_11_0_0 else True
 
-        new_consumer = from_kafka_version >= V_0_9_0_0
+        new_consumer = from_kafka_version.consumer_supports_bootstrap_server()
         # TODO - reduce the timeout
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
                                         self.topic, new_consumer=new_consumer, consumer_timeout_ms=30000,
diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py
index 7ef6b97..bfc316e 100644
--- a/tests/kafkatest/tests/end_to_end.py
+++ b/tests/kafkatest/tests/end_to_end.py
@@ -16,7 +16,7 @@
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.kafka import TopicPartition
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.verifiable_consumer import VerifiableConsumer
@@ -41,8 +41,8 @@ class EndToEndTest(Test):
         self.records_consumed = []
         self.last_consumed_offsets = {}
         
-    def create_zookeeper(self, num_nodes=1, **kwargs):
-        self.zk = ZookeeperService(self.test_context, num_nodes=num_nodes, **kwargs)
+    def create_zookeeper_if_necessary(self, num_nodes=1, **kwargs):
+        self.zk = ZookeeperService(self.test_context, num_nodes=num_nodes, **kwargs) if quorum.for_test(self.test_context) == quorum.zk else None
 
     def create_kafka(self, num_nodes=1, **kwargs):
         group_metadata_config = {
diff --git a/tests/kafkatest/tests/kafka_test.py b/tests/kafkatest/tests/kafka_test.py
index 7118721..7852768 100644
--- a/tests/kafkatest/tests/kafka_test.py
+++ b/tests/kafkatest/tests/kafka_test.py
@@ -17,7 +17,7 @@ from ducktape.tests.test import Test
 
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 
 
 class KafkaTest(Test):
@@ -34,12 +34,14 @@ class KafkaTest(Test):
         self.num_brokers = num_brokers
         self.topics = topics
 
-        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
 
         self.kafka = KafkaService(
             test_context, self.num_brokers,
-            self.zk, topics=self.topics)
+            self.zk, topics=self.topics,
+            controller_num_nodes_override=self.num_zk)
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
         self.kafka.start()
\ No newline at end of file
diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
index 4ac2795..ca07828 100644
--- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
@@ -15,6 +15,7 @@
 
 import random
 from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
@@ -50,6 +51,7 @@ class StreamsUpgradeTest(Test):
             node.version = KafkaVersion(to_version)
             self.kafka.start_node(node)
 
+    @cluster(num_nodes=6)
     @matrix(from_version=smoke_test_versions, to_version=dev_version, bounce_type=["full"])
     def test_app_upgrade(self, from_version, to_version, bounce_type):
         """
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index 6ebd1a5..69fc350 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
@@ -61,6 +62,7 @@ class StreamsBrokerCompatibility(Test):
         self.zk.start()
 
 
+    @cluster(num_nodes=4)
     @parametrize(broker_version=str(LATEST_2_4))
     @parametrize(broker_version=str(LATEST_2_3))
     @parametrize(broker_version=str(LATEST_2_2))
@@ -85,6 +87,7 @@ class StreamsBrokerCompatibility(Test):
         self.consumer.stop()
         self.kafka.stop()
 
+    @cluster(num_nodes=4)
     @parametrize(broker_version=str(LATEST_2_6))
     @parametrize(broker_version=str(LATEST_2_5))
     @parametrize(broker_version=str(LATEST_2_4))
@@ -129,6 +132,7 @@ class StreamsBrokerCompatibility(Test):
     #     self.consumer.stop()
     #     self.kafka.stop()
 
+    @cluster(num_nodes=4)
     @parametrize(broker_version=str(LATEST_0_10_2))
     @parametrize(broker_version=str(LATEST_0_10_1))
     @parametrize(broker_version=str(LATEST_0_10_0))
@@ -146,6 +150,7 @@ class StreamsBrokerCompatibility(Test):
 
         self.kafka.stop()
 
+    @cluster(num_nodes=4)
     @parametrize(broker_version=str(LATEST_2_4))
     @parametrize(broker_version=str(LATEST_2_3))
     @parametrize(broker_version=str(LATEST_2_2))
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 8fcf14a..5026d7a 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import time
+from ducktape.mark.resource import cluster
 from kafkatest.services.streams import StreamsBrokerDownResilienceService
 from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
 
@@ -40,6 +41,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
     def setUp(self):
         self.zk.start()
 
+    @cluster(num_nodes=5)
     def test_streams_resilient_to_broker_down(self):
         self.kafka.start()
 
@@ -75,6 +77,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
 
         self.kafka.stop()
 
+    @cluster(num_nodes=7)
     def test_streams_runs_with_broker_down_initially(self):
         self.kafka.start()
         node = self.kafka.leader(self.inputTopic)
@@ -141,6 +144,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
 
         self.kafka.stop()
 
+    @cluster(num_nodes=7)
     def test_streams_should_scale_in_while_brokers_down(self):
         self.kafka.start()
 
@@ -218,6 +222,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
 
         self.kafka.stop()
 
+    @cluster(num_nodes=7)
     def test_streams_should_failover_while_brokers_down(self):
         self.kafka.start()
 
diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
index 461573a..4658a53 100644
--- a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
@@ -15,6 +15,7 @@
 
 import time
 from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -66,6 +67,7 @@ class StreamsCooperativeRebalanceUpgradeTest(Test):
                                            throughput=1000,
                                            acks=1)
 
+    @cluster(num_nodes=8)
     @matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions)
     def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version):
         self.zookeeper.start()
diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py
index 3209b25..b96ec10 100644
--- a/tests/kafkatest/tests/streams/streams_optimized_test.py
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
@@ -56,6 +56,7 @@ class StreamsOptimizedTest(Test):
                                            throughput=1000,
                                            acks=1)
 
+    @cluster(num_nodes=9)
     def test_upgrade_optimized_topology(self):
         self.zookeeper.start()
         self.kafka.start()
diff --git a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
index 482da9c..d190a1c 100644
--- a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
+++ b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark.resource import cluster
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.streams import StreamsSmokeTestShutdownDeadlockService
 
@@ -29,6 +30,7 @@ class StreamsShutdownDeadlockTest(KafkaTest):
 
         self.driver = StreamsSmokeTestShutdownDeadlockService(test_context, self.kafka)
 
+    @cluster(num_nodes=3)
     def test_shutdown_wont_deadlock(self):
         """
         Start ShutdownDeadLockTest, wait for upt to 1 minute, and check that the process exited.
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index 1a4f296..b1f908d 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -16,6 +16,7 @@
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
+from kafkatest.services.kafka import quorum
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
 
@@ -46,8 +47,8 @@ class StreamsSmokeTest(KafkaTest):
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
 
     @cluster(num_nodes=8)
-    @matrix(processing_guarantee=['at_least_once', 'exactly_once', 'exactly_once_beta'], crash=[True, False])
-    def test_streams(self, processing_guarantee, crash):
+    @matrix(processing_guarantee=['at_least_once', 'exactly_once', 'exactly_once_beta'], crash=[True, False], metadata_quorum=quorum.all_non_upgrade)
+    def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk):
         processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
         processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
         processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index e847c3e..a8c0751 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 from kafkatest.services.streams import StreamsStandbyTaskService
 from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
@@ -43,6 +44,7 @@ class StreamsStandbyTask(BaseStreamsTest):
                                                                                  'replication-factor': 1}
                                                  })
 
+    @cluster(num_nodes=10)
     def test_standby_tasks_rebalance(self):
         # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
         configs = self.get_configs(
diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py b/tests/kafkatest/tests/streams/streams_static_membership_test.py
index e6072f4..f31c38a 100644
--- a/tests/kafkatest/tests/streams/streams_static_membership_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StaticMemberTestService
@@ -48,6 +49,7 @@ class StreamsStaticMembershipTest(Test):
                                            throughput=1000,
                                            acks=1)
 
+    @cluster(num_nodes=8)
     def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self):
         self.zookeeper.start()
         self.kafka.start()
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 78f171a..9aff673 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -186,6 +186,7 @@ class StreamsUpgradeTest(Test):
         processor.stop()
         processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
+    @cluster(num_nodes=6)
     @matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)])
     @matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)])
     def test_metadata_upgrade(self, from_version, to_version):
@@ -238,6 +239,7 @@ class StreamsUpgradeTest(Test):
                                    timeout_sec=60,
                                    err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
 
+    @cluster(num_nodes=6)
     def test_version_probing_upgrade(self):
         """
         Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
diff --git a/tests/kafkatest/tests/tools/log4j_appender_test.py b/tests/kafkatest/tests/tools/log4j_appender_test.py
index 61a5d2a..0287f2f 100644
--- a/tests/kafkatest/tests/tools/log4j_appender_test.py
+++ b/tests/kafkatest/tests/tools/log4j_appender_test.py
@@ -20,7 +20,7 @@ from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender
 
@@ -41,16 +41,18 @@ class Log4jAppenderTest(Test):
             TOPIC: {'partitions': 1, 'replication-factor': 1}
         }
 
-        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     def start_kafka(self, security_protocol, interbroker_security_protocol):
         self.kafka = KafkaService(
             self.test_context, self.num_brokers,
             self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
+            controller_num_nodes_override=self.num_zk)
         self.kafka.start()
 
     def start_appender(self, security_protocol):
@@ -70,10 +72,10 @@ class Log4jAppenderTest(Test):
         self.consumer.start()
 
     @cluster(num_nodes=4)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_non_upgrade)
     @cluster(num_nodes=5)
-    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
-    def test_log4j_appender(self, security_protocol='PLAINTEXT'):
+    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], metadata_quorum=quorum.all_non_upgrade)
+    def test_log4j_appender(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
         """
         Tests if KafkaLog4jAppender is producing to Kafka topic
         :return: None
diff --git a/tests/kafkatest/tests/tools/log_compaction_test.py b/tests/kafkatest/tests/tools/log_compaction_test.py
index 338060f..a91a976 100644
--- a/tests/kafkatest/tests/tools/log_compaction_test.py
+++ b/tests/kafkatest/tests/tools/log_compaction_test.py
@@ -14,13 +14,14 @@
 # limitations under the License.
 
 
+from ducktape.mark import matrix
 from ducktape.utils.util import wait_until
 from ducktape.tests.test import Test
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.kafka import config_property
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.log_compaction_tester import LogCompactionTester
 
 class LogCompactionTest(Test):
@@ -33,12 +34,13 @@ class LogCompactionTest(Test):
         self.num_zk = 1
         self.num_brokers = 1
 
-        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = None
         self.compaction_verifier = None
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     def start_kafka(self, security_protocol, interbroker_security_protocol):
         self.kafka = KafkaService(
@@ -49,7 +51,8 @@ class LogCompactionTest(Test):
             interbroker_security_protocol=interbroker_security_protocol,
             server_prop_overides=[
                 [config_property.LOG_SEGMENT_BYTES, LogCompactionTest.LOG_SEGMENT_BYTES],
-            ])
+            ],
+            controller_num_nodes_override=self.num_zk)
         self.kafka.start()
 
     def start_test_log_compaction_tool(self, security_protocol):
@@ -57,7 +60,8 @@ class LogCompactionTest(Test):
         self.compaction_verifier.start()
 
     @cluster(num_nodes=4)
-    def test_log_compaction(self, security_protocol='PLAINTEXT'):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_log_compaction(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
 
         self.start_kafka(security_protocol, security_protocol)
         self.start_test_log_compaction_tool(security_protocol)
diff --git a/tests/kafkatest/tests/tools/replica_verification_test.py b/tests/kafkatest/tests/tools/replica_verification_test.py
index f296c73..baa0536 100644
--- a/tests/kafkatest/tests/tools/replica_verification_test.py
+++ b/tests/kafkatest/tests/tools/replica_verification_test.py
@@ -14,13 +14,14 @@
 # limitations under the License.
 
 
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 from ducktape.tests.test import Test
-from ducktape.mark.resource import cluster
 
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.replica_verification_tool import ReplicaVerificationTool
 
 TOPIC = "topic-replica-verification"
@@ -39,19 +40,21 @@ class ReplicaVerificationToolTest(Test):
             TOPIC: {'partitions': 1, 'replication-factor': 2}
         }
 
-        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
         self.kafka = None
         self.producer = None
         self.replica_verifier = None
 
     def setUp(self):
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
 
     def start_kafka(self, security_protocol, interbroker_security_protocol):
         self.kafka = KafkaService(
             self.test_context, self.num_brokers,
             self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
+            controller_num_nodes_override=self.num_zk)
         self.kafka.start()
 
     def start_replica_verification_tool(self, security_protocol):
@@ -70,7 +73,8 @@ class ReplicaVerificationToolTest(Test):
         self.producer.stop()
 
     @cluster(num_nodes=6)
-    def test_replica_lags(self, security_protocol='PLAINTEXT'):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_replica_lags(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
         """
         Tests ReplicaVerificationTool
         :return: None
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index dc6083e..566dba5 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -62,6 +62,18 @@ class KafkaVersion(LooseVersion):
 
         return LooseVersion._cmp(self, other)
 
+    def consumer_supports_bootstrap_server(self):
+        """
+        Kafka supported a new consumer beginning with v0.9.0 where
+        we can specify --bootstrap-server instead of --zookeeper.
+
+        This version also allowed a --consumer-config file where we could specify
+        a security protocol other than PLAINTEXT.
+
+        :return: true if the version of Kafka supports a new consumer with --bootstrap-server
+        """
+        return self >= V_0_9_0_0
+
     def supports_named_listeners(self):
         return self >= V_0_10_2_0