You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/21 00:46:45 UTC

kafka git commit: KAFKA-2812: improve consumer integration tests

Repository: kafka
Updated Branches:
  refs/heads/trunk 0d68eb73f -> b16817a54


KAFKA-2812: improve consumer integration tests

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Geoff Anderson

Closes #500 from hachikuji/KAFKA-2812


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b16817a5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b16817a5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b16817a5

Branch: refs/heads/trunk
Commit: b16817a54c592eefc5a462132f45c5b4f786d5f1
Parents: 0d68eb7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Nov 20 15:46:42 2015 -0800
Committer: Guozhang Wang <gu...@Guozhang-Macbook.local>
Committed: Fri Nov 20 15:46:42 2015 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/verifiable_consumer.py | 249 +++++++++++++------
 tests/kafkatest/tests/consumer_test.py          | 224 ++++++++++++++---
 2 files changed, 370 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b16817a5/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 7d76166..eec46d7 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -20,12 +20,96 @@ from kafkatest.services.kafka.version import TRUNK
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.kafka import TopicPartition
 
-from collections import namedtuple
 import json
 import os
+import signal
 import subprocess
 import time
-import signal
+
+class ConsumerState:
+    Dead = 1
+    Rebalancing = 3
+    Joined = 2
+
+class ConsumerEventHandler(object):
+
+    def __init__(self, node):
+        self.node = node
+        self.state = ConsumerState.Dead
+        self.revoked_count = 0
+        self.assigned_count = 0
+        self.assignment = []
+        self.position = {}
+        self.committed = {}
+        self.total_consumed = 0
+
+    def handle_shutdown_complete(self):
+        self.state = ConsumerState.Dead
+        self.assignment = []
+        self.position = {}
+
+    def handle_offsets_committed(self, event):
+        if event["success"]:
+            for offset_commit in event["offsets"]:
+                topic = offset_commit["topic"]
+                partition = offset_commit["partition"]
+                tp = TopicPartition(topic, partition)
+                offset = offset_commit["offset"]
+                assert tp in self.assignment, "Committed offsets for a partition not assigned"
+                assert self.position[tp] <= offset, "The committed offset was greater than the current position"
+                self.committed[tp] = offset
+
+    def handle_records_consumed(self, event):
+        assert self.state == ConsumerState.Joined, "Consumed records should only be received when joined"
+
+        for record_batch in event["partitions"]:
+            tp = TopicPartition(topic=record_batch["topic"],
+                                partition=record_batch["partition"])
+            min_offset = record_batch["minOffset"]
+            max_offset = record_batch["maxOffset"]
+
+            assert tp in self.assignment, "Consumed records for a partition not assigned"
+            assert tp not in self.position or self.position[tp] == min_offset, \
+                "Consumed from an unexpected offset (%s, %s)" % (str(self.position[tp]), str(min_offset))
+            self.position[tp] = max_offset + 1 
+
+        self.total_consumed += event["count"]
+
+    def handle_partitions_revoked(self, event):
+        self.revoked_count += 1
+        self.state = ConsumerState.Rebalancing
+        self.position = {}
+
+    def handle_partitions_assigned(self, event):
+        self.assigned_count += 1
+        self.state = ConsumerState.Joined
+        assignment = []
+        for topic_partition in event["partitions"]:
+            topic = topic_partition["topic"]
+            partition = topic_partition["partition"]
+            assignment.append(TopicPartition(topic, partition))
+        self.assignment = assignment
+
+    def handle_kill_process(self, clean_shutdown):
+        # if the shutdown was clean, then we expect the explicit
+        # shutdown event from the consumer
+        if not clean_shutdown:
+            self.handle_shutdown_complete()
+
+    def current_assignment(self):
+        return list(self.assignment)
+
+    def current_position(self, tp):
+        if tp in self.position:
+            return self.position[tp]
+        else:
+            return None
+
+    def last_commit(self, tp):
+        if tp in self.committed:
+            return self.committed[tp]
+        else:
+            return None
 
 class VerifiableConsumer(BackgroundThreadService):
     PERSISTENT_ROOT = "/mnt/verifiable_consumer"
@@ -49,7 +133,8 @@ class VerifiableConsumer(BackgroundThreadService):
         }
 
     def __init__(self, context, num_nodes, kafka, topic, group_id,
-                 max_messages=-1, session_timeout=30000, version=TRUNK):
+                 max_messages=-1, session_timeout=30000, enable_autocommit=False,
+                 version=TRUNK):
         super(VerifiableConsumer, self).__init__(context, num_nodes)
         self.log_level = "TRACE"
         
@@ -58,23 +143,23 @@ class VerifiableConsumer(BackgroundThreadService):
         self.group_id = group_id
         self.max_messages = max_messages
         self.session_timeout = session_timeout
+        self.enable_autocommit = enable_autocommit
+        self.prop_file = ""
+        self.security_config = kafka.security_config.client_config(self.prop_file)
+        self.prop_file += str(self.security_config)
 
-        self.assignment = {}
-        self.joined = set()
-        self.total_records = 0
-        self.consumed_positions = {}
-        self.committed_offsets = {}
-        self.revoked_count = 0
-        self.assigned_count = 0
+        self.event_handlers = {}
+        self.global_position = {}
+        self.global_committed = {}
 
         for node in self.nodes:
             node.version = version
 
-        self.prop_file = ""
-        self.security_config = kafka.security_config.client_config(self.prop_file)
-        self.prop_file += str(self.security_config)
-
     def _worker(self, idx, node):
+        if node not in self.event_handlers:
+            self.event_handlers[node] = ConsumerEventHandler(node)
+
+        handler = self.event_handlers[node]
         node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
 
         # Create and upload log properties
@@ -86,7 +171,6 @@ class VerifiableConsumer(BackgroundThreadService):
         self.logger.info(self.prop_file)
         node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file)
         self.security_config.setup_node(node)
-
         cmd = self.start_cmd(node)
         self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))
 
@@ -96,51 +180,42 @@ class VerifiableConsumer(BackgroundThreadService):
                 with self.lock:
                     name = event["name"]
                     if name == "shutdown_complete":
-                        self._handle_shutdown_complete(node)
+                        handler.handle_shutdown_complete()
                     if name == "offsets_committed":
-                        self._handle_offsets_committed(node, event)
+                        handler.handle_offsets_committed(event)
+                        self._update_global_committed(event)
                     elif name == "records_consumed":
-                        self._handle_records_consumed(node, event)
+                        handler.handle_records_consumed(event)
+                        self._update_global_position(event)
                     elif name == "partitions_revoked":
-                        self._handle_partitions_revoked(node, event)
+                        handler.handle_partitions_revoked(event)
                     elif name == "partitions_assigned":
-                        self._handle_partitions_assigned(node, event)
-
-    def _handle_shutdown_complete(self, node):
-        if node in self.joined:
-            self.joined.remove(node)
-
-    def _handle_offsets_committed(self, node, event):
-        if event["success"]:
-            for offset_commit in event["offsets"]:
-                topic = offset_commit["topic"]
-                partition = offset_commit["partition"]
-                tp = TopicPartition(topic, partition)
-                self.committed_offsets[tp] = offset_commit["offset"]
-
-    def _handle_records_consumed(self, node, event):
-        for topic_partition in event["partitions"]:
-            topic = topic_partition["topic"]
-            partition = topic_partition["partition"]
-            tp = TopicPartition(topic, partition)
-            self.consumed_positions[tp] = topic_partition["maxOffset"] + 1
-        self.total_records += event["count"]
-
-    def _handle_partitions_revoked(self, node, event):
-        self.revoked_count += 1
-        self.assignment[node] = []
-        if node in self.joined:
-            self.joined.remove(node)
-
-    def _handle_partitions_assigned(self, node, event):
-        self.assigned_count += 1
-        self.joined.add(node)
-        assignment =[]
-        for topic_partition in event["partitions"]:
-            topic = topic_partition["topic"]
-            partition = topic_partition["partition"]
-            assignment.append(TopicPartition(topic, partition))
-        self.assignment[node] = assignment
+                        handler.handle_partitions_assigned(event)
+
+    def _update_global_position(self, consumed_event):
+        for consumed_partition in consumed_event["partitions"]:
+            tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"])
+            if tp in self.global_committed:
+                # verify that the position never gets behind the current commit.
+                assert self.global_committed[tp] <= consumed_partition["minOffset"], \
+                    "Consumed position %d is behind the current committed offset %d" % (consumed_partition["minOffset"], self.global_committed[tp])
+
+            # the consumer cannot generally guarantee that the position increases monotonically
+            # without gaps in the face of hard failures, so we only log a warning when this happens
+            if tp in self.global_position and self.global_position[tp] != consumed_partition["minOffset"]:
+                self.logger.warn("Expected next consumed offset of %d, but instead saw %d" %
+                                 (self.global_position[tp], consumed_partition["minOffset"]))
+
+            self.global_position[tp] = consumed_partition["maxOffset"] + 1
+
+    def _update_global_committed(self, commit_event):
+        if commit_event["success"]:
+            for offset_commit in commit_event["offsets"]:
+                tp = TopicPartition(offset_commit["topic"], offset_commit["partition"])
+                offset = offset_commit["offset"]
+                assert self.global_position[tp] >= offset, \
+                    "committed offset is ahead of the current partition"
+                self.global_committed[tp] = offset
 
     def start_cmd(self, node):
         cmd = ""
@@ -148,14 +223,14 @@ class VerifiableConsumer(BackgroundThreadService):
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
         cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \
-              " --group-id %s --topic %s --broker-list %s --session-timeout %s" % \
-              (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout)
+              " --group-id %s --topic %s --broker-list %s --session-timeout %s %s" % \
+              (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout,
+               "--enable-autocommit" if self.enable_autocommit else "")
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
 
         cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE
         cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, VerifiableConsumer.STDOUT_CAPTURE)
-        print(cmd)
         return cmd
 
     def pids(self, node):
@@ -174,6 +249,10 @@ class VerifiableConsumer(BackgroundThreadService):
             self.logger.debug("Could not parse as json: %s" % str(string))
             return None
 
+    def stop_all(self):
+        for node in self.nodes:
+            self.stop_node(node)
+
     def kill_node(self, node, clean_shutdown=True, allow_fail=False):
         if clean_shutdown:
             sig = signal.SIGTERM
@@ -182,11 +261,10 @@ class VerifiableConsumer(BackgroundThreadService):
         for pid in self.pids(node):
             node.account.signal(pid, sig, allow_fail)
 
-        if not clean_shutdown:
-            self._handle_shutdown_complete(node)
+        self.event_handlers[node].handle_kill_process(clean_shutdown)
 
-    def stop_node(self, node, clean_shutdown=True, allow_fail=False):
-        self.kill_node(node, clean_shutdown, allow_fail)
+    def stop_node(self, node, clean_shutdown=True):
+        self.kill_node(node, clean_shutdown=clean_shutdown)
         
         if self.worker_threads is None:
             return
@@ -203,20 +281,47 @@ class VerifiableConsumer(BackgroundThreadService):
 
     def current_assignment(self):
         with self.lock:
-            return self.assignment
+            return { handler.node: handler.current_assignment() for handler in self.event_handlers.itervalues() }
 
-    def position(self, tp):
+    def current_position(self, tp):
         with self.lock:
-            return self.consumed_positions[tp]
+            if tp in self.global_position:
+                return self.global_position[tp]
+            else:
+                return None
 
     def owner(self, tp):
+        for handler in self.event_handlers.itervalues():
+            if tp in handler.current_assignment():
+                return handler.node
+        return None
+
+    def last_commit(self, tp):
         with self.lock:
-            for node, assignment in self.assignment.iteritems():
-                if tp in assignment:
-                    return node
-            return None
+            if tp in self.global_committed:
+                return self.global_committed[tp]
+            else:
+                return None
 
-    def committed(self, tp):
+    def total_consumed(self):
         with self.lock:
-            return self.committed_offsets[tp]
+            return sum(handler.total_consumed for handler in self.event_handlers.itervalues())
 
+    def num_rebalances(self):
+        with self.lock:
+            return max(handler.assigned_count for handler in self.event_handlers.itervalues())
+
+    def joined_nodes(self):
+        with self.lock:
+            return [handler.node for handler in self.event_handlers.itervalues()
+                    if handler.state == ConsumerState.Joined]
+
+    def rebalancing_nodes(self):
+        with self.lock:
+            return [handler.node for handler in self.event_handlers.itervalues()
+                    if handler.state == ConsumerState.Rebalancing]
+
+    def dead_nodes(self):
+        with self.lock:
+            return [handler.node for handler in self.event_handlers.itervalues()
+                    if handler.state == ConsumerState.Dead]        

http://git-wip-us.apache.org/repos/asf/kafka/blob/b16817a5/tests/kafkatest/tests/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_test.py b/tests/kafkatest/tests/consumer_test.py
index 707ad2f..1b80470 100644
--- a/tests/kafkatest/tests/consumer_test.py
+++ b/tests/kafkatest/tests/consumer_test.py
@@ -23,15 +23,15 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.verifiable_consumer import VerifiableConsumer
 from kafkatest.services.kafka import TopicPartition
 
+import signal
+
 def partitions_for(topic, num_partitions):
     partitions = set()
     for i in range(num_partitions):
         partitions.add(TopicPartition(topic=topic, partition=i))
     return partitions
 
-
 class VerifiableConsumerTest(KafkaTest):
-
     STOPIC = "simple_topic"
     TOPIC = "test_topic"
     NUM_PARTITIONS = 3
@@ -61,23 +61,160 @@ class VerifiableConsumerTest(KafkaTest):
         partitions = self._partitions(assignment)
         return len(partitions) == self.NUM_PARTITIONS and set(partitions) == self.PARTITIONS
 
-    def _setup_consumer(self, topic):
+    def _setup_consumer(self, topic, enable_autocommit=False):
         return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
-                                  topic, self.GROUP_ID, session_timeout=self.session_timeout)
+                                  topic, self.GROUP_ID, session_timeout=self.session_timeout,
+                                  enable_autocommit=enable_autocommit)
 
     def _setup_producer(self, topic, max_messages=-1):
-        return VerifiableProducer(self.test_context, self.num_producers,
-                                  self.kafka, topic, max_messages=max_messages)
+        return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic,
+                                  max_messages=max_messages, throughput=500)
 
     def _await_all_members(self, consumer):
         # Wait until all members have joined the group
-        wait_until(lambda: len(consumer.joined) == self.num_consumers, timeout_sec=20,
+        wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers, timeout_sec=self.session_timeout+5,
                    err_msg="Consumers failed to join in a reasonable amount of time")
 
-    def test_consumer_failure(self):
+    def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in consumer.nodes:
+                consumer.stop_node(node, clean_shutdown)
+
+                wait_until(lambda: len(consumer.dead_nodes()) == (self.num_consumers - 1), timeout_sec=self.session_timeout,
+                           err_msg="Timed out waiting for the consumers to shutdown")
+
+                total_consumed = consumer.total_consumed()
+            
+                consumer.start_node(node)
+
+                wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,
+                           timeout_sec=self.session_timeout,
+                           err_msg="Timed out waiting for the consumers to shutdown")
+
+    def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in consumer.nodes:
+                consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+                       err_msg="Timed out waiting for the consumers to shutdown")
+
+            total_consumed = consumer.total_consumed()
+            
+            for node in consumer.nodes:
+                consumer.start_node(node)
+
+            wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,
+                       timeout_sec=self.session_timeout*2,
+                       err_msg="Timed out waiting for the consumers to shutdown")
+
+    def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in self.kafka.nodes:
+                total_consumed = consumer.total_consumed()
+
+                self.kafka.restart_node(node, clean_shutdown=True)
+
+                wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,
+                           timeout_sec=30,
+                           err_msg="Timed out waiting for the broker to shutdown")
+
+    def bounce_all_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+        for _ in range(num_bounces):
+            for node in self.kafka.nodes:
+                self.kafka.stop_node(node)
+
+            for node in self.kafka.nodes:
+                self.kafka.start_node(node)
+            
+
+    def test_broker_rolling_bounce(self):
+        """
+        Verify correct consumer behavior when the brokers are consecutively restarted.
+
+        Setup: single Kafka cluster with one producer writing messages to a single topic with one
+        partition, an a set of consumers in the same group reading from the same topic.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers and wait until they've joined the group.
+        - In a loop, restart each broker consecutively, waiting for the group to stabilize between
+          each broker restart.
+        - Verify delivery semantics according to the failure type and that the broker bounces
+          did not cause unexpected group rebalances.
+        """
         partition = TopicPartition(self.STOPIC, 0)
         
+        producer = self._setup_producer(self.STOPIC)
         consumer = self._setup_consumer(self.STOPIC)
+
+        producer.start()
+        wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,
+                   err_msg="Producer failed waiting for messages to be written")
+
+        consumer.start()
+        self._await_all_members(consumer)
+
+        num_rebalances = consumer.num_rebalances()
+        # TODO: make this test work with hard shutdowns, which probably requires
+        #       pausing before the node is restarted to ensure that any ephemeral
+        #       nodes have time to expire
+        self.rolling_bounce_brokers(consumer, clean_shutdown=True)
+        
+        unexpected_rebalances = consumer.num_rebalances() - num_rebalances
+        assert unexpected_rebalances == 0, \
+            "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
+
+        consumer.stop_all()
+
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+    @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
+    def test_consumer_bounce(self, clean_shutdown, bounce_mode):
+        """
+        Verify correct consumer behavior when the consumers in the group are consecutively restarted.
+
+        Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+        - Start a producer which continues producing new messages throughout the test.
+        - Start up the consumers and wait until they've joined the group.
+        - In a loop, restart each consumer, waiting for each one to rejoin the group before
+          restarting the rest.
+        - Verify delivery semantics according to the failure type.
+        """
+        partition = TopicPartition(self.STOPIC, 0)
+        
+        producer = self._setup_producer(self.STOPIC)
+        consumer = self._setup_consumer(self.STOPIC)
+
+        producer.start()
+        wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,
+                   err_msg="Producer failed waiting for messages to be written")
+
+        consumer.start()
+        self._await_all_members(consumer)
+
+        if bounce_mode == "all":
+            self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
+        else:
+            self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
+                
+        consumer.stop_all()
+        if clean_shutdown:
+            # if the total records consumed matches the current position, we haven't seen any duplicates
+            # this can only be guaranteed with a clean shutdown
+            assert consumer.current_position(partition) == consumer.total_consumed(), \
+                "Total consumed records did not match consumed position"
+        else:
+            # we may have duplicates in a hard failure
+            assert consumer.current_position(partition) <= consumer.total_consumed(), \
+                "Current position greater than the total number of consumed records"
+
+    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+    def test_consumer_failure(self, clean_shutdown, enable_autocommit):
+        partition = TopicPartition(self.STOPIC, 0)
+        
+        consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit)
         producer = self._setup_producer(self.STOPIC)
 
         consumer.start()
@@ -88,46 +225,72 @@ class VerifiableConsumerTest(KafkaTest):
 
         # startup the producer and ensure that some records have been written
         producer.start()
-        wait_until(lambda: producer.num_acked > 1000, timeout_sec=20,
+        wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,
                    err_msg="Producer failed waiting for messages to be written")
 
         # stop the partition owner and await its shutdown
-        consumer.kill_node(partition_owner, clean_shutdown=True)
-        wait_until(lambda: len(consumer.joined) == 1, timeout_sec=20,
-                   err_msg="Timed out waiting for consumer to close")
+        consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
+        wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
+                   timeout_sec=self.session_timeout+5, err_msg="Timed out waiting for consumer to close")
 
         # ensure that the remaining consumer does some work after rebalancing
-        current_total_records = consumer.total_records
-        wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20,
+        current_total_consumed = consumer.total_consumed()
+        wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=10,
                    err_msg="Timed out waiting for additional records to be consumed after first consumer failed")
 
-        # if the total records consumed matches the current position,
-        # we haven't seen any duplicates
-        assert consumer.position(partition) == consumer.total_records
-        assert consumer.committed(partition) <= consumer.total_records
+        consumer.stop_all()
 
-    def test_broker_failure(self):
+        if clean_shutdown:
+            # if the total records consumed matches the current position, we haven't seen any duplicates
+            # this can only be guaranteed with a clean shutdown
+            assert consumer.current_position(partition) == consumer.total_consumed(), \
+                "Total consumed records did not match consumed position"
+        else:
+            # we may have duplicates in a hard failure
+            assert consumer.current_position(partition) <= consumer.total_consumed(), \
+                "Current position greater than the total number of consumed records"
+
+        # if autocommit is not turned on, we can also verify the last committed offset
+        if not enable_autocommit:
+            assert consumer.last_commit(partition) == consumer.current_position(partition), \
+                "Last committed offset did not match last consumed position"
+
+
+    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+    def test_broker_failure(self, clean_shutdown, enable_autocommit):
         partition = TopicPartition(self.STOPIC, 0)
         
-        consumer = self._setup_consumer(self.STOPIC)
+        consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit)
         producer = self._setup_producer(self.STOPIC)
 
         producer.start()
         consumer.start()
         self._await_all_members(consumer)
 
+        num_rebalances = consumer.num_rebalances()
+
         # shutdown one of the brokers
-        self.kafka.signal_node(self.kafka.nodes[0])
+        # TODO: we need a way to target the coordinator instead of picking arbitrarily
+        self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
 
-        # ensure that the remaining consumer does some work after broker failure
-        current_total_records = consumer.total_records
-        wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20,
+        # ensure that the consumers do some work after the broker failure
+        current_total_consumed = consumer.total_consumed()
+        wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=20,
                    err_msg="Timed out waiting for additional records to be consumed after first consumer failed")
 
-        # if the total records consumed matches the current position,
-        # we haven't seen any duplicates
-        assert consumer.position(partition) == consumer.total_records
-        assert consumer.committed(partition) <= consumer.total_records
+        # verify that there were no rebalances on failover
+        assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
+
+        consumer.stop_all()
+
+        # if the total records consumed matches the current position, we haven't seen any duplicates
+        assert consumer.current_position(partition) == consumer.total_consumed(), \
+            "Total consumed records did not match consumed position"
+
+        # if autocommit is not turned on, we can also verify the last committed offset
+        if not enable_autocommit:
+            assert consumer.last_commit(partition) == consumer.current_position(partition), \
+                "Last committed offset did not match last consumed position"
 
     def test_simple_consume(self):
         total_records = 1000
@@ -144,14 +307,13 @@ class VerifiableConsumerTest(KafkaTest):
         wait_until(lambda: producer.num_acked == total_records, timeout_sec=20,
                    err_msg="Producer failed waiting for messages to be written")
 
-        wait_until(lambda: consumer.committed(partition) == total_records, timeout_sec=10,
+        wait_until(lambda: consumer.last_commit(partition) == total_records, timeout_sec=10,
                    err_msg="Consumer failed to read all expected messages")
 
-        assert consumer.position(partition) == total_records
+        assert consumer.current_position(partition) == total_records
 
     def test_valid_assignment(self):
         consumer = self._setup_consumer(self.TOPIC)
         consumer.start()
         self._await_all_members(consumer)
         assert self._valid_assignment(consumer.current_assignment())
-