You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/04/18 21:22:59 UTC
[kafka] branch trunk updated: MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (#12027)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6d36487b68 MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (#12027)
6d36487b68 is described below
commit 6d36487b684fd41522cccd4da4fd88f0b89ff0b7
Author: David Jacot <dj...@confluent.io>
AuthorDate: Mon Apr 18 23:22:33 2022 +0200
MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (#12027)
The second validation does not verify the second bounce because the verified producer and the verified consumer are stopped in `self.run_validation`. This means that the second `run_validation` just spit out the same information as the first one. Instead, we should just run the validation at the end.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
tests/kafkatest/tests/core/downgrade_test.py | 5 +++--
tests/kafkatest/tests/end_to_end.py | 18 ++++++++++++------
2 files changed, 15 insertions(+), 8 deletions(-)
diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py
index 772ec96d6c..8fb6fd5c44 100644
--- a/tests/kafkatest/tests/core/downgrade_test.py
+++ b/tests/kafkatest/tests/core/downgrade_test.py
@@ -139,14 +139,15 @@ class TestDowngrade(EndToEndTest):
self.logger.info("First pass bounce - rolling upgrade")
self.upgrade_from(kafka_version)
- self.run_validation()
+ self.await_consumed_records(min_records=5000)
upgrade_topic_id = self.kafka.topic_id(self.topic)
assert start_topic_id == upgrade_topic_id
self.logger.info("Second pass bounce - rolling downgrade")
+ num_records_acked = self.producer.num_acked
self.downgrade_to(kafka_version)
- self.run_validation()
+ self.run_validation(min_records=num_records_acked+5000)
downgrade_topic_id = self.kafka.topic_id(self.topic)
assert upgrade_topic_id == downgrade_topic_id
diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py
index 533056540b..3b9cf9f8c3 100644
--- a/tests/kafkatest/tests/end_to_end.py
+++ b/tests/kafkatest/tests/end_to_end.py
@@ -87,7 +87,13 @@ class EndToEndTest(Test):
self.last_consumed_offsets[partition] = offset
self.records_consumed.append(record_id)
- def await_consumed_offsets(self, last_acked_offsets, timeout_sec):
+ def await_produced_records(self, min_records, timeout_sec=30):
+ wait_until(lambda: self.producer.num_acked > min_records,
+ timeout_sec=timeout_sec,
+ err_msg="Producer failed to produce messages for %ds." %\
+ timeout_sec)
+
+ def await_consumed_offsets(self, last_acked_offsets, timeout_sec=30):
def has_finished_consuming():
for partition, offset in last_acked_offsets.items():
if not partition in self.last_consumed_offsets:
@@ -102,6 +108,10 @@ class EndToEndTest(Test):
err_msg="Consumer failed to consume up to offsets %s after waiting %ds." %\
(str(last_acked_offsets), timeout_sec))
+ def await_consumed_records(self, min_records, producer_timeout_sec=30,
+ consumer_timeout_sec=30):
+ self.await_produced_records(min_records=min_records)
+ self.await_consumed_offsets(self.producer.last_acked_offsets)
def _collect_all_logs(self):
for s in self.test_context.services:
@@ -120,11 +130,7 @@ class EndToEndTest(Test):
def run_validation(self, min_records=5000, producer_timeout_sec=30,
consumer_timeout_sec=30, enable_idempotence=False):
try:
- wait_until(lambda: self.producer.num_acked > min_records,
- timeout_sec=producer_timeout_sec,
- err_msg="Producer failed to produce messages for %ds." %\
- producer_timeout_sec)
-
+ self.await_produced_records(min_records, producer_timeout_sec)
self.logger.info("Stopping producer after writing up to offsets %s" %\
str(self.producer.last_acked_offsets))
self.producer.stop()