You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/18 23:08:46 UTC

[kafka] branch 2.0 updated: KAFKA-7773; Add end to end system test relying on verifiable consumer (#6070)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 3de3fb2  KAFKA-7773; Add end to end system test relying on verifiable consumer (#6070)
3de3fb2 is described below

commit 3de3fb27865b319d0f21ff84b33ece31c4d2e72e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Jan 8 06:14:51 2019 -0800

    KAFKA-7773; Add end to end system test relying on verifiable consumer (#6070)
    
    This commit creates an EndToEndTest base class which relies on the verifiable consumer. This will ultimately replace ProduceConsumeValidateTest which depends on the console consumer. The advantage is that the verifiable consumer exposes more information to use for validation. It also allows for a nicer shutdown pattern. Rather than relying on the console consumer idle timeout, which requires a minimum wait time, we can halt consumption after we have reached the last acked offsets. Thi [...]
---
 tests/kafkatest/services/kafka/kafka.py            |   4 +
 .../services/kafka/templates/kafka.properties      |   1 +
 tests/kafkatest/services/verifiable_consumer.py    |   8 +-
 tests/kafkatest/services/verifiable_producer.py    |   9 ++
 tests/kafkatest/tests/core/replication_test.py     |  64 ++++-----
 tests/kafkatest/tests/core/security_test.py        |  61 +++------
 tests/kafkatest/tests/end_to_end.py                | 151 +++++++++++++++++++++
 tests/kafkatest/tests/produce_consume_validate.py  |  68 ++--------
 tests/kafkatest/utils/__init__.py                  |   2 +-
 tests/kafkatest/utils/util.py                      |  56 ++++++++
 10 files changed, 291 insertions(+), 133 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 3d198b1..283bbe5 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -521,6 +521,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         return missing
 
+    def restart_cluster(self, clean_shutdown=True):
+        for node in self.nodes:
+            self.restart_node(node, clean_shutdown=clean_shutdown)
+
     def restart_node(self, node, clean_shutdown=True):
         """Restart the given node."""
         self.stop_node(node, clean_shutdown)
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 8cca14f..19deee3 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -62,6 +62,7 @@ replica.lag.time.max.ms={{replica_lag}}
 {% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %}
 auto.create.topics.enable={{ auto_create_topics_enable }}
 {% endif %}
+offsets.topic.num.partitions={{ num_nodes }}
 offsets.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }}
 # Set to a low, but non-zero value to exercise this path without making tests much slower
 group.initial.rebalance.delay.ms=100
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 95970d9..c0e186f 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -161,7 +161,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
     def __init__(self, context, num_nodes, kafka, topic, group_id,
                  max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
                  assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
-                 version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO"):
+                 version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO",
+                 on_record_consumed=None):
         super(VerifiableConsumer, self).__init__(context, num_nodes)
         self.log_level = log_level
         
@@ -174,6 +175,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         self.assignment_strategy = assignment_strategy
         self.prop_file = ""
         self.stop_timeout_sec = stop_timeout_sec
+        self.on_record_consumed = on_record_consumed
 
         self.event_handlers = {}
         self.global_position = {}
@@ -223,6 +225,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
                     elif name == "records_consumed":
                         handler.handle_records_consumed(event)
                         self._update_global_position(event, node)
+                    elif name == "record_data" and self.on_record_consumed:
+                        self.on_record_consumed(event, node)
                     elif name == "partitions_revoked":
                         handler.handle_partitions_revoked(event)
                     elif name == "partitions_assigned":
@@ -263,6 +267,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
         cmd += self.impl.exec_cmd(node)
+        if self.on_record_consumed:
+            cmd += " --verbose"
         cmd += " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
                (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol),
                self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "")
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index cbce27e..5c7152d 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -20,6 +20,7 @@ import time
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.services.background_thread import BackgroundThreadService
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.kafka import TopicPartition
 from kafkatest.services.verifiable_client import VerifiableClientMixin
 from kafkatest.utils import is_int, is_int_with_prefix
 from kafkatest.version import DEV_BRANCH
@@ -84,6 +85,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         for node in self.nodes:
             node.version = version
         self.acked_values = []
+        self._last_acked_offsets = {}
         self.not_acked_values = []
         self.produced_count = {}
         self.clean_shutdown_nodes = set()
@@ -156,7 +158,9 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
                         self.produced_count[idx] += 1
 
                     elif data["name"] == "producer_send_success":
+                        partition = TopicPartition(data["topic"], data["partition"])
                         self.acked_values.append(self.message_validator(data["value"]))
+                        self._last_acked_offsets[partition] = data["offset"]
                         self.produced_count[idx] += 1
 
                         # Log information if there is a large gap between successively acknowledged messages
@@ -218,6 +222,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         return len(self.pids(node)) > 0
 
     @property
+    def last_acked_offsets(self):
+        with self.lock:
+            return self._last_acked_offsets
+
+    @property
     def acked(self):
         with self.lock:
             return self.acked_values
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
index c16d679..f5c6422 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -19,12 +19,7 @@ from ducktape.mark import matrix
 from ducktape.mark import parametrize
 from ducktape.mark.resource import cluster
 
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
+from kafkatest.tests.end_to_end import EndToEndTest
 
 import signal
 
@@ -83,7 +78,7 @@ failures = {
 }
 
 
-class ReplicationTest(ProduceConsumeValidateTest):
+class ReplicationTest(EndToEndTest):
     """
     Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
     (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
@@ -98,25 +93,16 @@ class ReplicationTest(ProduceConsumeValidateTest):
     indicator that nothing is left to consume.
     """
 
+    TOPIC_CONFIG = {
+        "partitions": 3,
+        "replication-factor": 3,
+        "configs": {"min.insync.replicas": 2}
+    }
+ 
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
-        super(ReplicationTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk,
-                                  topics={self.topic: {
-                                      "partitions": 3,
-                                      "replication-factor": 3,
-                                      'configs': {"min.insync.replicas": 2}}
-                                  })
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
+        super(ReplicationTest, self).__init__(test_context=test_context, topic_config=self.TOPIC_CONFIG)
+ 
     def min_cluster_size(self):
         """Override this since we're adding services outside of the constructor"""
         return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
@@ -156,15 +142,23 @@ class ReplicationTest(ProduceConsumeValidateTest):
             - Validate that every acked message was consumed
         """
 
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        self.kafka.client_sasl_mechanism = client_sasl_mechanism
-        self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
-        self.enable_idempotence = enable_idempotence
-        compression_types = None if not compression_type else [compression_type] * self.num_producers
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic,
-                                           throughput=self.producer_throughput, compression_types=compression_types,
-                                           enable_idempotence=enable_idempotence)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=60000, message_validator=is_int)
+        self.create_zookeeper()
+        self.zk.start()
+
+        self.create_kafka(num_nodes=3,
+                          security_protocol=security_protocol,
+                          interbroker_security_protocol=security_protocol,
+                          client_sasl_mechanism=client_sasl_mechanism,
+                          interbroker_sasl_mechanism=interbroker_sasl_mechanism)
         self.kafka.start()
-        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))
+
+        compression_types = None if not compression_type else [compression_type]
+        self.create_producer(compression_types=compression_types, enable_idempotence=enable_idempotence)
+        self.producer.start()
+
+        self.create_consumer(log_level="DEBUG")
+        self.consumer.start()
+
+        self.await_startup()
+        failures[failure_mode](self, broker_type)
+        self.run_validation(enable_idempotence=enable_idempotence)
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
index 4edbcff..d62735a 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -19,14 +19,9 @@ from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 from ducktape.errors import TimeoutError
 
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.security.security_config import SslStores
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
+from kafkatest.tests.end_to_end import EndToEndTest
 
 class TestSslStores(SslStores):
     def __init__(self, local_scratch_dir, valid_hostname=True):
@@ -41,7 +36,7 @@ class TestSslStores(SslStores):
         else:
             return "invalidhostname"
 
-class SecurityTest(ProduceConsumeValidateTest):
+class SecurityTest(EndToEndTest):
     """
     These tests validate security features.
     """
@@ -50,21 +45,6 @@ class SecurityTest(ProduceConsumeValidateTest):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(SecurityTest, self).__init__(test_context=test_context)
 
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 2,
-                                                                    "replication-factor": 1}
-                                                                })
-        self.num_partitions = 2
-        self.timeout_sec = 10000
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
     def producer_consumer_have_expected_error(self, error):
         try:
             for node in self.producer.nodes:
@@ -87,16 +67,19 @@ class SecurityTest(ProduceConsumeValidateTest):
         with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
         """
 
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = interbroker_security_protocol
-        SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=False)
+        SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir,
+                                                  valid_hostname=False)
+
+        self.create_zookeeper()
+        self.zk.start()
 
+        self.create_kafka(security_protocol=security_protocol,
+                          interbroker_security_protocol=interbroker_security_protocol)
         self.kafka.start()
-        self.create_producer_and_consumer()
-        self.producer.log_level = "TRACE"
 
-        self.producer.start()
-        self.consumer.start()
+        # We need more verbose logging to catch the expected errors
+        self.create_and_start_clients(log_level="DEBUG")
+
         try:
             wait_until(lambda: self.producer.num_acked > 0, timeout_sec=5)
 
@@ -108,20 +91,18 @@ class SecurityTest(ProduceConsumeValidateTest):
             pass
 
         error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE'
-        wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=5)
-
+        wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=30)
         self.producer.stop()
         self.consumer.stop()
-        self.producer.log_level = "INFO"
 
         SecurityConfig.ssl_stores.valid_hostname = True
-        for node in self.kafka.nodes:
-            self.kafka.restart_node(node, clean_shutdown=True)
-
-        self.create_producer_and_consumer()
-        self.run_produce_consume_validate()
+        self.kafka.restart_cluster()
+        self.create_and_start_clients(log_level="INFO")
+        self.run_validation()
 
-    def create_producer_and_consumer(self):
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=10000, message_validator=is_int)
+    def create_and_start_clients(self, log_level):
+        self.create_producer(log_level=log_level)
+        self.producer.start()
 
+        self.create_consumer(log_level=log_level)
+        self.consumer.start()
diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py
new file mode 100644
index 0000000..9cc6b41
--- /dev/null
+++ b/tests/kafkatest/tests/end_to_end.py
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.utils import validate_delivery
+
+import time
+
+class EndToEndTest(Test):
+    """This class provides a shared template for tests which follow the common pattern of:
+
+        - produce to a topic in the background
+        - consume from that topic in the background
+        - run some logic, e.g. fail topic leader etc.
+        - perform validation
+    """
+
+    DEFAULT_TOPIC_CONFIG = {"partitions": 2, "replication-factor": 1}
+
+    def __init__(self, test_context, topic="test_topic", topic_config=DEFAULT_TOPIC_CONFIG):
+        super(EndToEndTest, self).__init__(test_context=test_context)
+        self.topic = topic
+        self.topic_config = topic_config
+        self.records_consumed = []
+        self.last_consumed_offsets = {}
+        
+    def create_zookeeper(self, num_nodes=1, **kwargs):
+        self.zk = ZookeeperService(self.test_context, num_nodes=num_nodes, **kwargs)
+
+    def create_kafka(self, num_nodes=1, **kwargs):
+        group_metadata_config = {
+            "partitions": num_nodes,
+            "replication-factor": min(num_nodes, 3),
+            "configs": {"cleanup.policy": "compact"}
+        }
+
+        topics = {
+            self.topic: self.topic_config,
+            "__consumer_offsets": group_metadata_config
+        }
+        self.kafka = KafkaService(self.test_context, num_nodes=num_nodes,
+                                  zk=self.zk, topics=topics, **kwargs)
+
+    def create_consumer(self, num_nodes=1, group_id="test_group", **kwargs):
+        self.consumer = VerifiableConsumer(self.test_context,
+                                           num_nodes=num_nodes,
+                                           kafka=self.kafka,
+                                           topic=self.topic,
+                                           group_id=group_id,
+                                           on_record_consumed=self.on_record_consumed,
+                                           **kwargs)
+                                    
+
+    def create_producer(self, num_nodes=1, throughput=1000, **kwargs):
+        self.producer = VerifiableProducer(self.test_context,
+                                           num_nodes=num_nodes,
+                                           kafka=self.kafka,
+                                           topic=self.topic,
+                                           throughput=throughput,
+                                           **kwargs)
+
+    def on_record_consumed(self, record, node):
+        partition = TopicPartition(record["topic"], record["partition"])
+        record_id = int(record["value"])
+        offset = record["offset"]
+        self.last_consumed_offsets[partition] = offset
+        self.records_consumed.append(record_id)
+
+    def await_consumed_offsets(self, last_acked_offsets, timeout_sec):
+        def has_finished_consuming():
+            for partition, offset in last_acked_offsets.iteritems():
+                if not partition in self.last_consumed_offsets:
+                    return False
+                if self.last_consumed_offsets[partition] < offset:
+                    return False
+            return True
+
+        wait_until(has_finished_consuming,
+                   timeout_sec=timeout_sec,
+                   err_msg="Consumer failed to consume up to offsets %s after waiting %ds." %\
+                   (str(last_acked_offsets), timeout_sec))
+
+
+    def _collect_all_logs(self):
+        for s in self.test_context.services:
+            self.mark_for_collect(s)
+
+    def await_startup(self, min_records=5, timeout_sec=30):
+        try:
+            wait_until(lambda: self.consumer.total_consumed() >= min_records,
+                       timeout_sec=timeout_sec,
+                       err_msg="Timed out after %ds while awaiting initial record delivery of %d records" %\
+                       (timeout_sec, min_records))
+        except BaseException:
+            self._collect_all_logs()
+            raise
+
+    def run_validation(self, min_records=5000, producer_timeout_sec=30,
+                       consumer_timeout_sec=30, enable_idempotence=False):
+        try:
+            wait_until(lambda: self.producer.num_acked > min_records,
+                       timeout_sec=producer_timeout_sec,
+                       err_msg="Producer failed to produce messages for %ds." %\
+                       producer_timeout_sec)
+
+            self.logger.info("Stopping producer after writing up to offsets %s" %\
+                         str(self.producer.last_acked_offsets))
+            self.producer.stop()
+
+            self.await_consumed_offsets(self.producer.last_acked_offsets, consumer_timeout_sec)
+            self.consumer.stop()
+            
+            self.validate(enable_idempotence)
+        except BaseException:
+            self._collect_all_logs()
+            raise
+
+    def validate(self, enable_idempotence):
+        self.logger.info("Number of acked records: %d" % len(self.producer.acked))
+        self.logger.info("Number of consumed records: %d" % len(self.records_consumed))
+
+        def check_lost_data(missing_records):
+            return self.kafka.search_data_files(self.topic, missing_records)
+
+        succeeded, error_msg = validate_delivery(self.producer.acked, self.records_consumed,
+                                                 enable_idempotence, check_lost_data)
+
+        # Collect all logs if validation fails
+        if not succeeded:
+            self._collect_all_logs()
+
+        assert succeeded, error_msg
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 7a78da3..e49d02e 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -15,6 +15,9 @@
 
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
+
+from kafkatest.utils import validate_delivery
+
 import time
 
 class ProduceConsumeValidateTest(Test):
@@ -115,68 +118,21 @@ class ProduceConsumeValidateTest(Test):
                 self.mark_for_collect(s)
             raise
 
-    @staticmethod
-    def annotate_missing_msgs(missing, acked, consumed, msg):
-        missing_list = list(missing)
-        msg += "%s acked message did not make it to the Consumer. They are: " %\
-            len(missing_list)
-        if len(missing_list) < 20:
-            msg += str(missing_list) + ". "
-        else:
-            msg += ", ".join(str(m) for m in missing_list[:20])
-            msg += "...plus %s more. Total Acked: %s, Total Consumed: %s. " \
-                   % (len(missing_list) - 20, len(set(acked)), len(set(consumed)))
-        return msg
-
-    @staticmethod
-    def annotate_data_lost(data_lost, msg, number_validated):
-        print_limit = 10
-        if len(data_lost) > 0:
-            msg += "The first %s missing messages were validated to ensure they are in Kafka's data files. " \
-                   "%s were missing. This suggests data loss. Here are some of the messages not found in the data files: %s\n" \
-                   % (number_validated, len(data_lost), str(data_lost[0:print_limit]) if len(data_lost) > print_limit else str(data_lost))
-        else:
-            msg += "We validated that the first %s of these missing messages correctly made it into Kafka's data files. " \
-                   "This suggests they were lost on their way to the consumer." % number_validated
-        return msg
-
     def validate(self):
-        """Check that each acked message was consumed."""
-        success = True
-        msg = ""
-        acked = self.producer.acked
-        consumed = self.consumer.messages_consumed[1]
-        # Correctness of the set difference operation depends on using equivalent message_validators in procuder and consumer
-        missing = set(acked) - set(consumed)
-
-        self.logger.info("num consumed:  %d" % len(consumed))
-
-        # Were all acked messages consumed?
-        if len(missing) > 0:
-            msg = self.annotate_missing_msgs(missing, acked, consumed, msg)
-            success = False
-
-            #Did we miss anything due to data loss?
-            to_validate = list(missing)[0:1000 if len(missing) > 1000 else len(missing)]
-            data_lost = self.kafka.search_data_files(self.topic, to_validate)
-            msg = self.annotate_data_lost(data_lost, msg, len(to_validate))
+        messages_consumed = self.consumer.messages_consumed[1]
 
+        self.logger.info("Number of acked records: %d" % len(self.producer.acked))
+        self.logger.info("Number of consumed records: %d" % len(messages_consumed))
 
-        if self.enable_idempotence:
-            self.logger.info("Ran a test with idempotence enabled. We expect no duplicates")
-        else:
-            self.logger.info("Ran a test with idempotence disabled.")
+        def check_lost_data(missing_records):
+            return self.kafka.search_data_files(self.topic, missing_records)
 
-        # Are there duplicates?
-        if len(set(consumed)) != len(consumed):
-            num_duplicates = abs(len(set(consumed)) - len(consumed))
-            msg += "(There are also %s duplicate messages in the log - but that is an acceptable outcome)\n" % num_duplicates
-            if self.enable_idempotence:
-                assert False, "Detected %s duplicates even though idempotence was enabled." % num_duplicates
+        succeeded, error_msg = validate_delivery(self.producer.acked, messages_consumed,
+                                                 self.enable_idempotence, check_lost_data)
 
         # Collect all logs if validation fails
-        if not success:
+        if not succeeded:
             for s in self.test_context.services:
                 self.mark_for_collect(s)
 
-        assert success, msg
+        assert succeeded, error_msg
diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py
index 8c473bf..1c1d5e0 100644
--- a/tests/kafkatest/utils/__init__.py
+++ b/tests/kafkatest/utils/__init__.py
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable
+from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
index dd20273..b9ccaf8 100644
--- a/tests/kafkatest/utils/util.py
+++ b/tests/kafkatest/utils/util.py
@@ -112,3 +112,59 @@ def node_is_reachable(src_node, dst_node):
     :return:                True only if dst is reachable from src.
     """
     return 0 == src_node.account.ssh("nc -w 3 -z %s 22" % dst_node.account.hostname, allow_fail=True)
+
+
+def annotate_missing_msgs(missing, acked, consumed, msg):
+    missing_list = list(missing)
+    msg += "%s acked message did not make it to the Consumer. They are: " %\
+        len(missing_list)
+    if len(missing_list) < 20:
+        msg += str(missing_list) + ". "
+    else:
+        msg += ", ".join(str(m) for m in missing_list[:20])
+        msg += "...plus %s more. Total Acked: %s, Total Consumed: %s. " \
+            % (len(missing_list) - 20, len(set(acked)), len(set(consumed)))
+    return msg
+
+def annotate_data_lost(data_lost, msg, number_validated):
+    print_limit = 10
+    if len(data_lost) > 0:
+        msg += "The first %s missing messages were validated to ensure they are in Kafka's data files. " \
+            "%s were missing. This suggests data loss. Here are some of the messages not found in the data files: %s\n" \
+            % (number_validated, len(data_lost), str(data_lost[0:print_limit]) if len(data_lost) > print_limit else str(data_lost))
+    else:
+        msg += "We validated that the first %s of these missing messages correctly made it into Kafka's data files. " \
+            "This suggests they were lost on their way to the consumer." % number_validated
+    return msg
+
+def validate_delivery(acked, consumed, idempotence_enabled=False, check_lost_data=None):
+    """Check that each acked message was consumed."""
+    success = True
+    msg = ""
+
+    # Correctness of the set difference operation depends on using equivalent
+    # message_validators in producer and consumer
+    missing = set(acked) - set(consumed)
+    
+    # Were all acked messages consumed?
+    if len(missing) > 0:
+        msg = annotate_missing_msgs(missing, acked, consumed, msg)
+        success = False
+        
+        # Did we miss anything due to data loss?
+        if check_lost_data:
+            to_validate = list(missing)[0:1000 if len(missing) > 1000 else len(missing)]
+            data_lost = check_lost_data(to_validate)
+            msg = annotate_data_lost(data_lost, msg, len(to_validate))
+
+    # Are there duplicates?
+    if len(set(consumed)) != len(consumed):
+        num_duplicates = abs(len(set(consumed)) - len(consumed))
+
+        if idempotence_enabled:
+            success = False
+            msg += "Detected %d duplicates even though idempotence was enabled.\n" % num_duplicates
+        else:
+            msg += "(There are also %d duplicate messages in the log - but that is an acceptable outcome)\n" % num_duplicates
+
+    return success, msg