You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/12/20 08:36:10 UTC

[kafka] branch trunk updated: KAFKA-14343: Upgrade tests for state updater (#12801)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8675d4723a KAFKA-14343: Upgrade tests for state updater  (#12801)
c8675d4723a is described below

commit c8675d4723a45d99aee6f64c6bf4703eeaea81aa
Author: Lucas Brutschy <lu...@users.noreply.github.com>
AuthorDate: Tue Dec 20 09:35:59 2022 +0100

    KAFKA-14343: Upgrade tests for state updater  (#12801)
    
    A test that verifies the upgrade from a version of Streams with
    state updater disabled to a version with state updater enabled
    and vice versa, so that we can offer a save upgrade path.
    
     - upgrade test from a version of Streams with state updater
    disabled to a version with state updater enabled
     - downgrade test from a version of Streams with state updater
     enabled to a version with state updater disabled
    
    Reviewer: Bruno Cadonna <ca...@apache.org>
---
 .../tests/streams/streams_upgrade_test.py          | 96 +++++++++++++---------
 1 file changed, 59 insertions(+), 37 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 503135b416e..961be68acfc 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -211,24 +211,12 @@ class StreamsUpgradeTest(Test):
         else:
             extra_properties = {}
 
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
-        self.kafka.start()
-
-        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
-        self.driver.disable_auto_terminate()
-        self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
-        self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
-        self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.set_up_services()
 
         self.driver.start()
 
         self.start_all_nodes_with(from_version, extra_properties)
 
-        self.processors = [self.processor1, self.processor2, self.processor3]
-
         counter = 1
         random.seed()
 
@@ -245,17 +233,7 @@ class StreamsUpgradeTest(Test):
             self.do_stop_start_bounce(p, None, to_version, counter, extra_properties)
             counter = counter + 1
 
-        # shutdown
-        self.driver.stop()
-
-        random.shuffle(self.processors)
-        for p in self.processors:
-            node = p.node
-            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
-                p.stop()
-                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
-                                   timeout_sec=60,
-                                   err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+        self.stop_and_await()
 
     @cluster(num_nodes=6)
     def test_version_probing_upgrade(self):
@@ -263,22 +241,11 @@ class StreamsUpgradeTest(Test):
         Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
         """
 
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
-        self.kafka.start()
-
-        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
-        self.driver.disable_auto_terminate()
-        self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
-        self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
-        self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.set_up_services()
 
         self.driver.start()
         self.start_all_nodes_with("") # run with TRUNK
 
-        self.processors = [self.processor1, self.processor2, self.processor3]
         self.old_processors = [self.processor1, self.processor2, self.processor3]
         self.upgraded_processors = []
 
@@ -293,7 +260,62 @@ class StreamsUpgradeTest(Test):
             current_generation = self.do_rolling_bounce(p, counter, current_generation)
             counter = counter + 1
 
-        # shutdown
+        self.stop_and_await()
+
+    @cluster(num_nodes=6)
+    @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], to_version=[str(DEV_VERSION)], upgrade=[True, False])
+    def test_upgrade_downgrade_state_updater(self, from_version, to_version, upgrade):
+        """
+        Starts 3 KafkaStreams instances, and enables / disables state restoration
+        for the instances in a rolling bounce.
+
+        Once same-thread state restoration is removed from the code, this test
+        should use different versions of the code.
+        """
+        if upgrade:
+            extra_properties_first = { '__state.updater.enabled__': 'false' }
+            first_version = from_version
+            extra_properties_second = { '__state.updater.enabled__': 'true' }
+            second_version = to_version
+        else:
+            extra_properties_first = { '__state.updater.enabled__': 'true' }
+            first_version = to_version
+            extra_properties_second = { '__state.updater.enabled__': 'false' }
+            second_version = from_version
+
+        self.set_up_services()
+
+        self.driver.start()
+        self.start_all_nodes_with(first_version, extra_properties_first)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = True
+            self.do_stop_start_bounce(p, None, second_version, counter, extra_properties_second)
+            counter = counter + 1
+
+        self.stop_and_await()
+
+    def set_up_services(self):
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
+        self.kafka.start()
+
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.driver.disable_auto_terminate()
+        self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+
+    def stop_and_await(self):
         self.driver.stop()
 
         random.shuffle(self.processors)