You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/09/14 23:10:15 UTC
[kafka] branch 3.3 updated: MINOR: Adds KRaft versions of most streams system tests (#12458)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 2b53b609586 MINOR: Adds KRaft versions of most streams system tests (#12458)
2b53b609586 is described below
commit 2b53b609586f968104a1b516c0f129b86827ef9e
Author: Alan Sheinberg <57...@users.noreply.github.com>
AuthorDate: Fri Aug 26 14:11:19 2022 -0700
MINOR: Adds KRaft versions of most streams system tests (#12458)
Migrates Streams sustem tests to either use kraft brokers or to use both kraft and zk in a testing matrix.
This skips tests which use various forms of Kafka versioning since those seem to have issues with KRaft at the moment. Running these tests with KRaft will require a followup PR.
Reviewers: Guozhang Wang <gu...@apache.org>, John Roesler <vv...@apache.org>
---
.../tests/streams/streams_broker_bounce_test.py | 39 +++++++++++++++-------
.../streams/streams_broker_down_resilience_test.py | 17 +++++++---
tests/kafkatest/tests/streams/streams_eos_test.py | 27 +++++++--------
.../streams_named_repartition_topic_test.py | 20 +++++++----
.../tests/streams/streams_optimized_test.py | 20 +++++++----
.../tests/streams/streams_relational_smoke_test.py | 6 ++--
.../streams/streams_shutdown_deadlock_test.py | 5 ++-
.../kafkatest/tests/streams/streams_smoke_test.py | 5 +--
.../tests/streams/streams_standby_replica_test.py | 5 ++-
.../streams/streams_static_membership_test.py | 20 +++++++----
10 files changed, 110 insertions(+), 54 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index d8992808127..c5fdfcf8743 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -18,7 +18,7 @@ from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
from ducktape.mark import matrix
from ducktape.mark import ignore
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
import time
@@ -67,7 +67,8 @@ def hard_bounce(test, topic, broker_type):
# Since this is a hard kill, we need to make sure the process is down and that
# zookeeper has registered the loss by expiring the broker's session timeout.
- wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
+ wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and
+ not (quorum.for_test(test.test_context) == quorum.zk and test.kafka.is_registered(prev_broker_node)),
timeout_sec=test.kafka.zk_session_timeout + 5,
err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
@@ -151,10 +152,16 @@ class StreamsBrokerBounceTest(Test):
def setup_system(self, start_processor=True, num_threads=3):
# Setup phase
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.zk.start()
-
- self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=self.zk, topics=self.topics)
+ self.zk = (
+ ZookeeperService(self.test_context, 1)
+ if quorum.for_test(self.test_context) == quorum.zk
+ else None
+ )
+ if self.zk:
+ self.zk.start()
+
+ self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=self.zk, topics=self.topics,
+ controller_num_nodes_override=1)
self.kafka.start()
# allow some time for topics to be created
@@ -205,11 +212,17 @@ class StreamsBrokerBounceTest(Test):
return data
@cluster(num_nodes=7)
+ @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+ broker_type=["leader"],
+ num_threads=[1, 3],
+ sleep_time_secs=[120],
+ metadata_quorum=[quorum.remote_kraft])
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["leader", "controller"],
num_threads=[1, 3],
- sleep_time_secs=[120])
- def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads):
+ sleep_time_secs=[120],
+ metadata_quorum=[quorum.zk])
+ def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum):
"""
Start a smoke test client, then kill one particular broker and ensure data is still received
Record if records are delivered.
@@ -251,8 +264,9 @@ class StreamsBrokerBounceTest(Test):
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
- num_failures=[2])
- def test_many_brokers_bounce(self, failure_mode, num_failures):
+ num_failures=[2],
+ metadata_quorum=quorum.all_non_upgrade)
+ def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum=quorum.zk):
"""
Start a smoke test client, then kill a few brokers and ensure data is still received
Record if records are delivered
@@ -269,8 +283,9 @@ class StreamsBrokerBounceTest(Test):
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_bounce", "hard_bounce"],
- num_failures=[3])
- def test_all_brokers_bounce(self, failure_mode, num_failures):
+ num_failures=[3],
+ metadata_quorum=quorum.all_non_upgrade)
+ def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum=quorum.zk):
"""
Start a smoke test client, then kill a few brokers and ensure data is still received
Record if records are delivered
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 6957d699617..182c1e789b9 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -14,7 +14,9 @@
# limitations under the License.
import time
+from ducktape.mark import matrix
from ducktape.mark.resource import cluster
+from kafkatest.services.kafka import quorum
from kafkatest.services.streams import StreamsBrokerDownResilienceService
from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
@@ -39,10 +41,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
num_brokers=1)
def setUp(self):
- self.zk.start()
+ if self.zk:
+ self.zk.start()
@cluster(num_nodes=7)
- def test_streams_resilient_to_broker_down(self):
+ @matrix(metadata_quorum=[quorum.remote_kraft])
+ def test_streams_resilient_to_broker_down(self, metadata_quorum):
self.kafka.start()
# Broker should be down over 2x of retries * timeout ms
@@ -78,7 +82,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=7)
- def test_streams_runs_with_broker_down_initially(self):
+ @matrix(metadata_quorum=[quorum.remote_kraft])
+ def test_streams_runs_with_broker_down_initially(self, metadata_quorum):
self.kafka.start()
node = self.kafka.leader(self.inputTopic)
self.kafka.stop_node(node)
@@ -145,7 +150,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=9)
- def test_streams_should_scale_in_while_brokers_down(self):
+ @matrix(metadata_quorum=[quorum.remote_kraft])
+ def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum):
self.kafka.start()
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
@@ -223,7 +229,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=9)
- def test_streams_should_failover_while_brokers_down(self):
+ @matrix(metadata_quorum=[quorum.remote_kraft])
+ def test_streams_should_failover_while_brokers_down(self, metadata_quorum):
self.kafka.start()
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 618d378ade1..8d8853ac461 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.kafka import quorum
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
@@ -38,18 +39,18 @@ class StreamsEosTest(KafkaTest):
self.test_context = test_context
@cluster(num_nodes=9)
- @parametrize(processing_guarantee="exactly_once")
- @parametrize(processing_guarantee="exactly_once_v2")
- def test_rebalance_simple(self, processing_guarantee):
+ @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
+ metadata_quorum=[quorum.remote_kraft])
+ def test_rebalance_simple(self, processing_guarantee, metadata_quorum):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=9)
- @parametrize(processing_guarantee="exactly_once")
- @parametrize(processing_guarantee="exactly_once_v2")
- def test_rebalance_complex(self, processing_guarantee):
+ @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
+ metadata_quorum=[quorum.remote_kraft])
+ def test_rebalance_complex(self, processing_guarantee, metadata_quorum):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@@ -82,18 +83,18 @@ class StreamsEosTest(KafkaTest):
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=9)
- @parametrize(processing_guarantee="exactly_once")
- @parametrize(processing_guarantee="exactly_once_v2")
- def test_failure_and_recovery(self, processing_guarantee):
+ @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
+ metadata_quorum=[quorum.remote_kraft])
+ def test_failure_and_recovery(self, processing_guarantee, metadata_quorum):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=9)
- @parametrize(processing_guarantee="exactly_once")
- @parametrize(processing_guarantee="exactly_once_v2")
- def test_failure_and_recovery_complex(self, processing_guarantee):
+ @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
+ metadata_quorum=[quorum.remote_kraft])
+ def test_failure_and_recovery_complex(self, processing_guarantee, metadata_quorum):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
index ce270d303a6..423b01c41e6 100644
--- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
+++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsNamedRepartitionTopicService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
@@ -40,9 +41,13 @@ class StreamsNamedRepartitionTopicTest(Test):
self.aggregation_topic: {'partitions': 6}
}
- self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+ self.zookeeper = (
+ ZookeeperService(self.test_context, 1)
+ if quorum.for_test(self.test_context) == quorum.zk
+ else None
+ )
self.kafka = KafkaService(self.test_context, num_nodes=3,
- zk=self.zookeeper, topics=self.topics)
+ zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
self.producer = VerifiableProducer(self.test_context,
1,
@@ -52,8 +57,10 @@ class StreamsNamedRepartitionTopicTest(Test):
acks=1)
@cluster(num_nodes=8)
- def test_upgrade_topology_with_named_repartition_topic(self):
- self.zookeeper.start()
+ @matrix(metadata_quorum=[quorum.remote_kraft])
+ def test_upgrade_topology_with_named_repartition_topic(self, metadata_quorum):
+ if self.zookeeper:
+ self.zookeeper.start()
self.kafka.start()
processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
@@ -84,7 +91,8 @@ class StreamsNamedRepartitionTopicTest(Test):
self.producer.stop()
self.kafka.stop()
- self.zookeeper.stop()
+ if self.zookeeper:
+ self.zookeeper.stop()
def verify_processing(self, processors):
for processor in processors:
diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py
index b96ec10d6ba..56506841896 100644
--- a/tests/kafkatest/tests/streams/streams_optimized_test.py
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
from kafkatest.services.streams import StreamsResetter
from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -45,9 +46,13 @@ class StreamsOptimizedTest(Test):
self.join_topic: {'partitions': 6}
}
- self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+ self.zookeeper = (
+ ZookeeperService(self.test_context, 1)
+ if quorum.for_test(self.test_context) == quorum.zk
+ else None
+ )
self.kafka = KafkaService(self.test_context, num_nodes=3,
- zk=self.zookeeper, topics=self.topics)
+ zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
self.producer = VerifiableProducer(self.test_context,
1,
@@ -57,8 +62,10 @@ class StreamsOptimizedTest(Test):
acks=1)
@cluster(num_nodes=9)
- def test_upgrade_optimized_topology(self):
- self.zookeeper.start()
+ @matrix(metadata_quorum=[quorum.remote_kraft])
+ def test_upgrade_optimized_topology(self, metadata_quorum):
+ if self.zookeeper:
+ self.zookeeper.start()
self.kafka.start()
processor1 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka)
@@ -104,7 +111,8 @@ class StreamsOptimizedTest(Test):
self.logger.info("teardown")
self.producer.stop()
self.kafka.stop()
- self.zookeeper.stop()
+ if self.zookeeper:
+ self.zookeeper.stop()
def reset_application(self):
resetter = StreamsResetter(self.test_context, self.kafka, topic = self.input_topic, applicationId = 'StreamsOptimizedTest')
diff --git a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
index fe10a29369e..b91ec0c3e60 100644
--- a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
@@ -17,6 +17,7 @@
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import quorum
from kafkatest.services.streams import StreamsTestBaseService
from kafkatest.tests.kafka_test import KafkaTest
@@ -85,8 +86,9 @@ class StreamsRelationalSmokeTest(KafkaTest):
@cluster(num_nodes=8)
@matrix(crash=[False, True],
- processing_guarantee=['exactly_once', 'exactly_once_v2'])
- def test_streams(self, crash, processing_guarantee):
+ processing_guarantee=['exactly_once', 'exactly_once_v2'],
+ metadata_quorum=[quorum.remote_kraft])
+ def test_streams(self, crash, processing_guarantee, metadata_quorum):
driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored", "ignored")
LOG_FILE = driver.LOG_FILE # this is the same for all instances of the service, so we can just declare a "constant"
diff --git a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
index d190a1c3119..3da5b481da9 100644
--- a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
+++ b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
@@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.kafka import quorum
from kafkatest.services.streams import StreamsSmokeTestShutdownDeadlockService
@@ -31,7 +33,8 @@ class StreamsShutdownDeadlockTest(KafkaTest):
self.driver = StreamsSmokeTestShutdownDeadlockService(test_context, self.kafka)
@cluster(num_nodes=3)
- def test_shutdown_wont_deadlock(self):
+ @matrix(metadata_quorum=[quorum.remote_kraft])
+ def test_shutdown_wont_deadlock(self, metadata_quorum):
"""
Start ShutdownDeadLockTest, wait for upt to 1 minute, and check that the process exited.
If it hasn't exited then fail as it is deadlocked
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index 6cd2b7611a3..8a119f37d99 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -47,8 +47,9 @@ class StreamsSmokeTest(KafkaTest):
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
@cluster(num_nodes=8)
- @matrix(processing_guarantee=['at_least_once'], crash=[True, False], metadata_quorum=quorum.all_non_upgrade)
- @matrix(processing_guarantee=['exactly_once', 'exactly_once_v2'], crash=[True, False])
+ @matrix(processing_guarantee=['exactly_once', 'exactly_once_v2', 'at_least_once'],
+ crash=[True, False],
+ metadata_quorum=quorum.all_non_upgrade)
def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk):
processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index a8c07513c1c..d11979385ac 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import quorum
from kafkatest.services.streams import StreamsStandbyTaskService
from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
@@ -45,7 +47,8 @@ class StreamsStandbyTask(BaseStreamsTest):
})
@cluster(num_nodes=10)
- def test_standby_tasks_rebalance(self):
+ @matrix(metadata_quorum=[quorum.remote_kraft])
+ def test_standby_tasks_rebalance(self, metadata_quorum):
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
configs = self.get_configs(
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor" % (
diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py b/tests/kafkatest/tests/streams/streams_static_membership_test.py
index 5838484be10..c2241c5e2b1 100644
--- a/tests/kafkatest/tests/streams/streams_static_membership_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StaticMemberTestService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
@@ -38,9 +39,13 @@ class StreamsStaticMembershipTest(Test):
self.input_topic: {'partitions': 18},
}
- self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+ self.zookeeper = (
+ ZookeeperService(self.test_context, 1)
+ if quorum.for_test(self.test_context) == quorum.zk
+ else None
+ )
self.kafka = KafkaService(self.test_context, num_nodes=3,
- zk=self.zookeeper, topics=self.topics)
+ zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
self.producer = VerifiableProducer(self.test_context,
1,
@@ -50,8 +55,10 @@ class StreamsStaticMembershipTest(Test):
acks=1)
@cluster(num_nodes=8)
- def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self):
- self.zookeeper.start()
+ @matrix(metadata_quorum=[quorum.remote_kraft])
+ def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self, metadata_quorum):
+ if self.zookeeper:
+ self.zookeeper.start()
self.kafka.start()
numThreads = 3
@@ -97,7 +104,8 @@ class StreamsStaticMembershipTest(Test):
self.producer.stop()
self.kafka.stop(timeout_sec=120)
- self.zookeeper.stop()
+ if self.zookeeper:
+ self.zookeeper.stop()
def verify_processing(self, processors):
for processor in processors: