You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/12/20 08:36:10 UTC
[kafka] branch trunk updated: KAFKA-14343: Upgrade tests for state updater (#12801)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8675d4723a KAFKA-14343: Upgrade tests for state updater (#12801)
c8675d4723a is described below
commit c8675d4723a45d99aee6f64c6bf4703eeaea81aa
Author: Lucas Brutschy <lu...@users.noreply.github.com>
AuthorDate: Tue Dec 20 09:35:59 2022 +0100
KAFKA-14343: Upgrade tests for state updater (#12801)
A test that verifies the upgrade from a version of Streams with
state updater disabled to a version with state updater enabled
and vice versa, so that we can offer a save upgrade path.
- upgrade test from a version of Streams with state updater
disabled to a version with state updater enabled
- downgrade test from a version of Streams with state updater
enabled to a version with state updater disabled
Reviewer: Bruno Cadonna <ca...@apache.org>
---
.../tests/streams/streams_upgrade_test.py | 96 +++++++++++++---------
1 file changed, 59 insertions(+), 37 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 503135b416e..961be68acfc 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -211,24 +211,12 @@ class StreamsUpgradeTest(Test):
else:
extra_properties = {}
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.zk.start()
-
- self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
- self.kafka.start()
-
- self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
- self.driver.disable_auto_terminate()
- self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
- self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
- self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+ self.set_up_services()
self.driver.start()
self.start_all_nodes_with(from_version, extra_properties)
- self.processors = [self.processor1, self.processor2, self.processor3]
-
counter = 1
random.seed()
@@ -245,17 +233,7 @@ class StreamsUpgradeTest(Test):
self.do_stop_start_bounce(p, None, to_version, counter, extra_properties)
counter = counter + 1
- # shutdown
- self.driver.stop()
-
- random.shuffle(self.processors)
- for p in self.processors:
- node = p.node
- with node.account.monitor_log(p.STDOUT_FILE) as monitor:
- p.stop()
- monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
- timeout_sec=60,
- err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+ self.stop_and_await()
@cluster(num_nodes=6)
def test_version_probing_upgrade(self):
@@ -263,22 +241,11 @@ class StreamsUpgradeTest(Test):
Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
"""
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.zk.start()
-
- self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
- self.kafka.start()
-
- self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
- self.driver.disable_auto_terminate()
- self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
- self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
- self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+ self.set_up_services()
self.driver.start()
self.start_all_nodes_with("") # run with TRUNK
- self.processors = [self.processor1, self.processor2, self.processor3]
self.old_processors = [self.processor1, self.processor2, self.processor3]
self.upgraded_processors = []
@@ -293,7 +260,62 @@ class StreamsUpgradeTest(Test):
current_generation = self.do_rolling_bounce(p, counter, current_generation)
counter = counter + 1
- # shutdown
+ self.stop_and_await()
+
+ @cluster(num_nodes=6)
+ @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], to_version=[str(DEV_VERSION)], upgrade=[True, False])
+ def test_upgrade_downgrade_state_updater(self, from_version, to_version, upgrade):
+ """
+ Starts 3 KafkaStreams instances, and enables / disables state restoration
+ for the instances in a rolling bounce.
+
+ Once same-thread state restoration is removed from the code, this test
+ should use different versions of the code.
+ """
+ if upgrade:
+ extra_properties_first = { '__state.updater.enabled__': 'false' }
+ first_version = from_version
+ extra_properties_second = { '__state.updater.enabled__': 'true' }
+ second_version = to_version
+ else:
+ extra_properties_first = { '__state.updater.enabled__': 'true' }
+ first_version = to_version
+ extra_properties_second = { '__state.updater.enabled__': 'false' }
+ second_version = from_version
+
+ self.set_up_services()
+
+ self.driver.start()
+ self.start_all_nodes_with(first_version, extra_properties_first)
+
+ counter = 1
+ random.seed()
+
+ # rolling bounce
+ random.shuffle(self.processors)
+ for p in self.processors:
+ p.CLEAN_NODE_ENABLED = True
+ self.do_stop_start_bounce(p, None, second_version, counter, extra_properties_second)
+ counter = counter + 1
+
+ self.stop_and_await()
+
+ def set_up_services(self):
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+ self.zk.start()
+
+ self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
+ self.kafka.start()
+
+ self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+ self.driver.disable_auto_terminate()
+ self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+ self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+ self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+
+ self.processors = [self.processor1, self.processor2, self.processor3]
+
+ def stop_and_await(self):
self.driver.stop()
random.shuffle(self.processors)