You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/03/06 20:07:48 UTC
[kafka] branch 2.5 updated: KAFKA-9662: Wait for consumer offset
reset in throttle test to avoid losing early messages (#8227)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 573a149 KAFKA-9662: Wait for consumer offset reset in throttle test to avoid losing early messages (#8227)
573a149 is described below
commit 573a149acc84f8ae3ae88e25c6c066d2944e7838
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Mar 6 19:50:22 2020 +0000
KAFKA-9662: Wait for consumer offset reset in throttle test to avoid losing early messages (#8227)
---
tests/kafkatest/services/console_consumer.py | 17 +++++++++++++++++
tests/kafkatest/tests/produce_consume_validate.py | 5 +++++
2 files changed, 22 insertions(+)
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 0811bcd..df34015 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -16,7 +16,9 @@
import itertools
import os
+from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin
@@ -296,3 +298,18 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
def java_class_name(self):
return "ConsoleConsumer"
+
+ def has_log_message(self, node, message):
+ try:
+ node.account.ssh("grep '%s' %s" % (message, ConsoleConsumer.LOG_FILE))
+ except RemoteCommandError:
+ return False
+ return True
+
+ def wait_for_offset_reset(self, node, topic, num_partitions):
+ for partition in range(num_partitions):
+ message = "Resetting offset for partition %s-%d" % (topic, partition)
+ wait_until(lambda: self.has_log_message(node, message),
+ timeout_sec=60,
+ err_msg="Offset not reset for partition %s-%d" % (topic, partition))
+
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 22aa096..c691cbc 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -59,6 +59,11 @@ class ProduceConsumeValidateTest(Test):
err_msg="Consumer process took more than %d s to fork" %\
self.consumer_init_timeout_sec)
+ # If consuming only latest messages, wait for offset reset to ensure all messages are consumed
+ if not self.consumer.from_beginning:
+ self.consumer.wait_for_offset_reset(self.consumer.nodes[0], self.topic, self.num_partitions)
+
+
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5,
timeout_sec=self.producer_start_timeout_sec,