You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/03/06 20:07:48 UTC

[kafka] branch 2.5 updated: KAFKA-9662: Wait for consumer offset reset in throttle test to avoid losing early messages (#8227)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 573a149  KAFKA-9662: Wait for consumer offset reset in throttle test to avoid losing early messages (#8227)
573a149 is described below

commit 573a149acc84f8ae3ae88e25c6c066d2944e7838
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Mar 6 19:50:22 2020 +0000

    KAFKA-9662: Wait for consumer offset reset in throttle test to avoid losing early messages (#8227)
---
 tests/kafkatest/services/console_consumer.py      | 17 +++++++++++++++++
 tests/kafkatest/tests/produce_consume_validate.py |  5 +++++
 2 files changed, 22 insertions(+)

diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 0811bcd..df34015 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -16,7 +16,9 @@
 import itertools
 import os
 
+from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin
@@ -296,3 +298,18 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
 
     def java_class_name(self):
         return "ConsoleConsumer"
+
+    def has_log_message(self, node, message):
+        try:
+            node.account.ssh("grep '%s' %s" % (message, ConsoleConsumer.LOG_FILE))
+        except RemoteCommandError:
+            return False
+        return True
+
+    def wait_for_offset_reset(self, node, topic, num_partitions):
+        for partition in range(num_partitions):
+            message = "Resetting offset for partition %s-%d" % (topic, partition)
+            wait_until(lambda: self.has_log_message(node, message),
+                       timeout_sec=60,
+                       err_msg="Offset not reset for partition %s-%d" % (topic, partition))
+
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 22aa096..c691cbc 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -59,6 +59,11 @@ class ProduceConsumeValidateTest(Test):
                        err_msg="Consumer process took more than %d s to fork" %\
                        self.consumer_init_timeout_sec)
 
+        # If consuming only latest messages, wait for offset reset to ensure all messages are consumed
+        if not self.consumer.from_beginning:
+            self.consumer.wait_for_offset_reset(self.consumer.nodes[0], self.topic, self.num_partitions)
+
+
         self.producer.start()
         wait_until(lambda: self.producer.num_acked > 5,
                    timeout_sec=self.producer_start_timeout_sec,