You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/04/18 21:22:59 UTC

[kafka] branch trunk updated: MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (#12027)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d36487b68 MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (#12027)
6d36487b68 is described below

commit 6d36487b684fd41522cccd4da4fd88f0b89ff0b7
Author: David Jacot <dj...@confluent.io>
AuthorDate: Mon Apr 18 23:22:33 2022 +0200

    MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (#12027)
    
    The second validation does not verify the second bounce because the verified producer and the verified consumer are stopped in `self.run_validation`. This means that the second `run_validation` just spit out the same information as the first one. Instead, we should just run the validation at the end.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 tests/kafkatest/tests/core/downgrade_test.py |  5 +++--
 tests/kafkatest/tests/end_to_end.py          | 18 ++++++++++++------
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py
index 772ec96d6c..8fb6fd5c44 100644
--- a/tests/kafkatest/tests/core/downgrade_test.py
+++ b/tests/kafkatest/tests/core/downgrade_test.py
@@ -139,14 +139,15 @@ class TestDowngrade(EndToEndTest):
 
         self.logger.info("First pass bounce - rolling upgrade")
         self.upgrade_from(kafka_version)
-        self.run_validation()
+        self.await_consumed_records(min_records=5000)
 
         upgrade_topic_id = self.kafka.topic_id(self.topic)
         assert start_topic_id == upgrade_topic_id
 
         self.logger.info("Second pass bounce - rolling downgrade")
+        num_records_acked = self.producer.num_acked
         self.downgrade_to(kafka_version)
-        self.run_validation()
+        self.run_validation(min_records=num_records_acked+5000)
 
         downgrade_topic_id = self.kafka.topic_id(self.topic)
         assert upgrade_topic_id == downgrade_topic_id
diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py
index 533056540b..3b9cf9f8c3 100644
--- a/tests/kafkatest/tests/end_to_end.py
+++ b/tests/kafkatest/tests/end_to_end.py
@@ -87,7 +87,13 @@ class EndToEndTest(Test):
         self.last_consumed_offsets[partition] = offset
         self.records_consumed.append(record_id)
 
-    def await_consumed_offsets(self, last_acked_offsets, timeout_sec):
+    def await_produced_records(self, min_records, timeout_sec=30):
+        wait_until(lambda: self.producer.num_acked > min_records,
+                   timeout_sec=timeout_sec,
+                   err_msg="Producer failed to produce messages for %ds." %\
+                   timeout_sec)
+
+    def await_consumed_offsets(self, last_acked_offsets, timeout_sec=30):
         def has_finished_consuming():
             for partition, offset in last_acked_offsets.items():
                 if not partition in self.last_consumed_offsets:
@@ -102,6 +108,10 @@ class EndToEndTest(Test):
                    err_msg="Consumer failed to consume up to offsets %s after waiting %ds." %\
                    (str(last_acked_offsets), timeout_sec))
 
+    def await_consumed_records(self, min_records, producer_timeout_sec=30,
+                               consumer_timeout_sec=30):
+        self.await_produced_records(min_records=min_records)
+        self.await_consumed_offsets(self.producer.last_acked_offsets)
 
     def _collect_all_logs(self):
         for s in self.test_context.services:
@@ -120,11 +130,7 @@ class EndToEndTest(Test):
     def run_validation(self, min_records=5000, producer_timeout_sec=30,
                        consumer_timeout_sec=30, enable_idempotence=False):
         try:
-            wait_until(lambda: self.producer.num_acked > min_records,
-                       timeout_sec=producer_timeout_sec,
-                       err_msg="Producer failed to produce messages for %ds." %\
-                       producer_timeout_sec)
-
+            self.await_produced_records(min_records, producer_timeout_sec)
             self.logger.info("Stopping producer after writing up to offsets %s" %\
                          str(self.producer.last_acked_offsets))
             self.producer.stop()