You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/21 00:46:45 UTC
kafka git commit: KAFKA-2812: improve consumer integration tests
Repository: kafka
Updated Branches:
refs/heads/trunk 0d68eb73f -> b16817a54
KAFKA-2812: improve consumer integration tests
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Geoff Anderson
Closes #500 from hachikuji/KAFKA-2812
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b16817a5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b16817a5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b16817a5
Branch: refs/heads/trunk
Commit: b16817a54c592eefc5a462132f45c5b4f786d5f1
Parents: 0d68eb7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Nov 20 15:46:42 2015 -0800
Committer: Guozhang Wang <gu...@Guozhang-Macbook.local>
Committed: Fri Nov 20 15:46:42 2015 -0800
----------------------------------------------------------------------
tests/kafkatest/services/verifiable_consumer.py | 249 +++++++++++++------
tests/kafkatest/tests/consumer_test.py | 224 ++++++++++++++---
2 files changed, 370 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b16817a5/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 7d76166..eec46d7 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -20,12 +20,96 @@ from kafkatest.services.kafka.version import TRUNK
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.kafka import TopicPartition
-from collections import namedtuple
import json
import os
+import signal
import subprocess
import time
-import signal
+
+class ConsumerState:
+ Dead = 1
+ Rebalancing = 3
+ Joined = 2
+
+class ConsumerEventHandler(object):
+
+ def __init__(self, node):
+ self.node = node
+ self.state = ConsumerState.Dead
+ self.revoked_count = 0
+ self.assigned_count = 0
+ self.assignment = []
+ self.position = {}
+ self.committed = {}
+ self.total_consumed = 0
+
+ def handle_shutdown_complete(self):
+ self.state = ConsumerState.Dead
+ self.assignment = []
+ self.position = {}
+
+ def handle_offsets_committed(self, event):
+ if event["success"]:
+ for offset_commit in event["offsets"]:
+ topic = offset_commit["topic"]
+ partition = offset_commit["partition"]
+ tp = TopicPartition(topic, partition)
+ offset = offset_commit["offset"]
+ assert tp in self.assignment, "Committed offsets for a partition not assigned"
+ assert self.position[tp] <= offset, "The committed offset was greater than the current position"
+ self.committed[tp] = offset
+
+ def handle_records_consumed(self, event):
+ assert self.state == ConsumerState.Joined, "Consumed records should only be received when joined"
+
+ for record_batch in event["partitions"]:
+ tp = TopicPartition(topic=record_batch["topic"],
+ partition=record_batch["partition"])
+ min_offset = record_batch["minOffset"]
+ max_offset = record_batch["maxOffset"]
+
+ assert tp in self.assignment, "Consumed records for a partition not assigned"
+ assert tp not in self.position or self.position[tp] == min_offset, \
+ "Consumed from an unexpected offset (%s, %s)" % (str(self.position[tp]), str(min_offset))
+ self.position[tp] = max_offset + 1
+
+ self.total_consumed += event["count"]
+
+ def handle_partitions_revoked(self, event):
+ self.revoked_count += 1
+ self.state = ConsumerState.Rebalancing
+ self.position = {}
+
+ def handle_partitions_assigned(self, event):
+ self.assigned_count += 1
+ self.state = ConsumerState.Joined
+ assignment = []
+ for topic_partition in event["partitions"]:
+ topic = topic_partition["topic"]
+ partition = topic_partition["partition"]
+ assignment.append(TopicPartition(topic, partition))
+ self.assignment = assignment
+
+ def handle_kill_process(self, clean_shutdown):
+ # if the shutdown was clean, then we expect the explicit
+ # shutdown event from the consumer
+ if not clean_shutdown:
+ self.handle_shutdown_complete()
+
+ def current_assignment(self):
+ return list(self.assignment)
+
+ def current_position(self, tp):
+ if tp in self.position:
+ return self.position[tp]
+ else:
+ return None
+
+ def last_commit(self, tp):
+ if tp in self.committed:
+ return self.committed[tp]
+ else:
+ return None
class VerifiableConsumer(BackgroundThreadService):
PERSISTENT_ROOT = "/mnt/verifiable_consumer"
@@ -49,7 +133,8 @@ class VerifiableConsumer(BackgroundThreadService):
}
def __init__(self, context, num_nodes, kafka, topic, group_id,
- max_messages=-1, session_timeout=30000, version=TRUNK):
+ max_messages=-1, session_timeout=30000, enable_autocommit=False,
+ version=TRUNK):
super(VerifiableConsumer, self).__init__(context, num_nodes)
self.log_level = "TRACE"
@@ -58,23 +143,23 @@ class VerifiableConsumer(BackgroundThreadService):
self.group_id = group_id
self.max_messages = max_messages
self.session_timeout = session_timeout
+ self.enable_autocommit = enable_autocommit
+ self.prop_file = ""
+ self.security_config = kafka.security_config.client_config(self.prop_file)
+ self.prop_file += str(self.security_config)
- self.assignment = {}
- self.joined = set()
- self.total_records = 0
- self.consumed_positions = {}
- self.committed_offsets = {}
- self.revoked_count = 0
- self.assigned_count = 0
+ self.event_handlers = {}
+ self.global_position = {}
+ self.global_committed = {}
for node in self.nodes:
node.version = version
- self.prop_file = ""
- self.security_config = kafka.security_config.client_config(self.prop_file)
- self.prop_file += str(self.security_config)
-
def _worker(self, idx, node):
+ if node not in self.event_handlers:
+ self.event_handlers[node] = ConsumerEventHandler(node)
+
+ handler = self.event_handlers[node]
node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
# Create and upload log properties
@@ -86,7 +171,6 @@ class VerifiableConsumer(BackgroundThreadService):
self.logger.info(self.prop_file)
node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file)
self.security_config.setup_node(node)
-
cmd = self.start_cmd(node)
self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))
@@ -96,51 +180,42 @@ class VerifiableConsumer(BackgroundThreadService):
with self.lock:
name = event["name"]
if name == "shutdown_complete":
- self._handle_shutdown_complete(node)
+ handler.handle_shutdown_complete()
if name == "offsets_committed":
- self._handle_offsets_committed(node, event)
+ handler.handle_offsets_committed(event)
+ self._update_global_committed(event)
elif name == "records_consumed":
- self._handle_records_consumed(node, event)
+ handler.handle_records_consumed(event)
+ self._update_global_position(event)
elif name == "partitions_revoked":
- self._handle_partitions_revoked(node, event)
+ handler.handle_partitions_revoked(event)
elif name == "partitions_assigned":
- self._handle_partitions_assigned(node, event)
-
- def _handle_shutdown_complete(self, node):
- if node in self.joined:
- self.joined.remove(node)
-
- def _handle_offsets_committed(self, node, event):
- if event["success"]:
- for offset_commit in event["offsets"]:
- topic = offset_commit["topic"]
- partition = offset_commit["partition"]
- tp = TopicPartition(topic, partition)
- self.committed_offsets[tp] = offset_commit["offset"]
-
- def _handle_records_consumed(self, node, event):
- for topic_partition in event["partitions"]:
- topic = topic_partition["topic"]
- partition = topic_partition["partition"]
- tp = TopicPartition(topic, partition)
- self.consumed_positions[tp] = topic_partition["maxOffset"] + 1
- self.total_records += event["count"]
-
- def _handle_partitions_revoked(self, node, event):
- self.revoked_count += 1
- self.assignment[node] = []
- if node in self.joined:
- self.joined.remove(node)
-
- def _handle_partitions_assigned(self, node, event):
- self.assigned_count += 1
- self.joined.add(node)
- assignment =[]
- for topic_partition in event["partitions"]:
- topic = topic_partition["topic"]
- partition = topic_partition["partition"]
- assignment.append(TopicPartition(topic, partition))
- self.assignment[node] = assignment
+ handler.handle_partitions_assigned(event)
+
+ def _update_global_position(self, consumed_event):
+ for consumed_partition in consumed_event["partitions"]:
+ tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"])
+ if tp in self.global_committed:
+ # verify that the position never gets behind the current commit.
+ assert self.global_committed[tp] <= consumed_partition["minOffset"], \
+ "Consumed position %d is behind the current committed offset %d" % (consumed_partition["minOffset"], self.global_committed[tp])
+
+ # the consumer cannot generally guarantee that the position increases monotonically
+ # without gaps in the face of hard failures, so we only log a warning when this happens
+ if tp in self.global_position and self.global_position[tp] != consumed_partition["minOffset"]:
+ self.logger.warn("Expected next consumed offset of %d, but instead saw %d" %
+ (self.global_position[tp], consumed_partition["minOffset"]))
+
+ self.global_position[tp] = consumed_partition["maxOffset"] + 1
+
+ def _update_global_committed(self, commit_event):
+ if commit_event["success"]:
+ for offset_commit in commit_event["offsets"]:
+ tp = TopicPartition(offset_commit["topic"], offset_commit["partition"])
+ offset = offset_commit["offset"]
+ assert self.global_position[tp] >= offset, \
+ "committed offset is ahead of the current partition"
+ self.global_committed[tp] = offset
def start_cmd(self, node):
cmd = ""
@@ -148,14 +223,14 @@ class VerifiableConsumer(BackgroundThreadService):
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \
- " --group-id %s --topic %s --broker-list %s --session-timeout %s" % \
- (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout)
+ " --group-id %s --topic %s --broker-list %s --session-timeout %s %s" % \
+ (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout,
+ "--enable-autocommit" if self.enable_autocommit else "")
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, VerifiableConsumer.STDOUT_CAPTURE)
- print(cmd)
return cmd
def pids(self, node):
@@ -174,6 +249,10 @@ class VerifiableConsumer(BackgroundThreadService):
self.logger.debug("Could not parse as json: %s" % str(string))
return None
+ def stop_all(self):
+ for node in self.nodes:
+ self.stop_node(node)
+
def kill_node(self, node, clean_shutdown=True, allow_fail=False):
if clean_shutdown:
sig = signal.SIGTERM
@@ -182,11 +261,10 @@ class VerifiableConsumer(BackgroundThreadService):
for pid in self.pids(node):
node.account.signal(pid, sig, allow_fail)
- if not clean_shutdown:
- self._handle_shutdown_complete(node)
+ self.event_handlers[node].handle_kill_process(clean_shutdown)
- def stop_node(self, node, clean_shutdown=True, allow_fail=False):
- self.kill_node(node, clean_shutdown, allow_fail)
+ def stop_node(self, node, clean_shutdown=True):
+ self.kill_node(node, clean_shutdown=clean_shutdown)
if self.worker_threads is None:
return
@@ -203,20 +281,47 @@ class VerifiableConsumer(BackgroundThreadService):
def current_assignment(self):
with self.lock:
- return self.assignment
+ return { handler.node: handler.current_assignment() for handler in self.event_handlers.itervalues() }
- def position(self, tp):
+ def current_position(self, tp):
with self.lock:
- return self.consumed_positions[tp]
+ if tp in self.global_position:
+ return self.global_position[tp]
+ else:
+ return None
def owner(self, tp):
+ for handler in self.event_handlers.itervalues():
+ if tp in handler.current_assignment():
+ return handler.node
+ return None
+
+ def last_commit(self, tp):
with self.lock:
- for node, assignment in self.assignment.iteritems():
- if tp in assignment:
- return node
- return None
+ if tp in self.global_committed:
+ return self.global_committed[tp]
+ else:
+ return None
- def committed(self, tp):
+ def total_consumed(self):
with self.lock:
- return self.committed_offsets[tp]
+ return sum(handler.total_consumed for handler in self.event_handlers.itervalues())
+ def num_rebalances(self):
+ with self.lock:
+ return max(handler.assigned_count for handler in self.event_handlers.itervalues())
+
+ def joined_nodes(self):
+ with self.lock:
+ return [handler.node for handler in self.event_handlers.itervalues()
+ if handler.state == ConsumerState.Joined]
+
+ def rebalancing_nodes(self):
+ with self.lock:
+ return [handler.node for handler in self.event_handlers.itervalues()
+ if handler.state == ConsumerState.Rebalancing]
+
+ def dead_nodes(self):
+ with self.lock:
+ return [handler.node for handler in self.event_handlers.itervalues()
+ if handler.state == ConsumerState.Dead]
http://git-wip-us.apache.org/repos/asf/kafka/blob/b16817a5/tests/kafkatest/tests/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_test.py b/tests/kafkatest/tests/consumer_test.py
index 707ad2f..1b80470 100644
--- a/tests/kafkatest/tests/consumer_test.py
+++ b/tests/kafkatest/tests/consumer_test.py
@@ -23,15 +23,15 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.kafka import TopicPartition
+import signal
+
def partitions_for(topic, num_partitions):
partitions = set()
for i in range(num_partitions):
partitions.add(TopicPartition(topic=topic, partition=i))
return partitions
-
class VerifiableConsumerTest(KafkaTest):
-
STOPIC = "simple_topic"
TOPIC = "test_topic"
NUM_PARTITIONS = 3
@@ -61,23 +61,160 @@ class VerifiableConsumerTest(KafkaTest):
partitions = self._partitions(assignment)
return len(partitions) == self.NUM_PARTITIONS and set(partitions) == self.PARTITIONS
- def _setup_consumer(self, topic):
+ def _setup_consumer(self, topic, enable_autocommit=False):
return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
- topic, self.GROUP_ID, session_timeout=self.session_timeout)
+ topic, self.GROUP_ID, session_timeout=self.session_timeout,
+ enable_autocommit=enable_autocommit)
def _setup_producer(self, topic, max_messages=-1):
- return VerifiableProducer(self.test_context, self.num_producers,
- self.kafka, topic, max_messages=max_messages)
+ return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic,
+ max_messages=max_messages, throughput=500)
def _await_all_members(self, consumer):
# Wait until all members have joined the group
- wait_until(lambda: len(consumer.joined) == self.num_consumers, timeout_sec=20,
+ wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers, timeout_sec=self.session_timeout+5,
err_msg="Consumers failed to join in a reasonable amount of time")
- def test_consumer_failure(self):
+ def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in consumer.nodes:
+ consumer.stop_node(node, clean_shutdown)
+
+ wait_until(lambda: len(consumer.dead_nodes()) == (self.num_consumers - 1), timeout_sec=self.session_timeout,
+ err_msg="Timed out waiting for the consumers to shutdown")
+
+ total_consumed = consumer.total_consumed()
+
+ consumer.start_node(node)
+
+ wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,
+ timeout_sec=self.session_timeout,
+ err_msg="Timed out waiting for the consumers to shutdown")
+
+ def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in consumer.nodes:
+ consumer.stop_node(node, clean_shutdown)
+
+ wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+ err_msg="Timed out waiting for the consumers to shutdown")
+
+ total_consumed = consumer.total_consumed()
+
+ for node in consumer.nodes:
+ consumer.start_node(node)
+
+ wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,
+ timeout_sec=self.session_timeout*2,
+ err_msg="Timed out waiting for the consumers to shutdown")
+
+ def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in self.kafka.nodes:
+ total_consumed = consumer.total_consumed()
+
+ self.kafka.restart_node(node, clean_shutdown=True)
+
+ wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,
+ timeout_sec=30,
+ err_msg="Timed out waiting for the broker to shutdown")
+
+ def bounce_all_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in self.kafka.nodes:
+ self.kafka.stop_node(node)
+
+ for node in self.kafka.nodes:
+ self.kafka.start_node(node)
+
+
+ def test_broker_rolling_bounce(self):
+ """
+ Verify correct consumer behavior when the brokers are consecutively restarted.
+
+ Setup: single Kafka cluster with one producer writing messages to a single topic with one
+ partition, an a set of consumers in the same group reading from the same topic.
+
+ - Start a producer which continues producing new messages throughout the test.
+ - Start up the consumers and wait until they've joined the group.
+ - In a loop, restart each broker consecutively, waiting for the group to stabilize between
+ each broker restart.
+ - Verify delivery semantics according to the failure type and that the broker bounces
+ did not cause unexpected group rebalances.
+ """
partition = TopicPartition(self.STOPIC, 0)
+ producer = self._setup_producer(self.STOPIC)
consumer = self._setup_consumer(self.STOPIC)
+
+ producer.start()
+ wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,
+ err_msg="Producer failed waiting for messages to be written")
+
+ consumer.start()
+ self._await_all_members(consumer)
+
+ num_rebalances = consumer.num_rebalances()
+ # TODO: make this test work with hard shutdowns, which probably requires
+ # pausing before the node is restarted to ensure that any ephemeral
+ # nodes have time to expire
+ self.rolling_bounce_brokers(consumer, clean_shutdown=True)
+
+ unexpected_rebalances = consumer.num_rebalances() - num_rebalances
+ assert unexpected_rebalances == 0, \
+ "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
+
+ consumer.stop_all()
+
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
+ def test_consumer_bounce(self, clean_shutdown, bounce_mode):
+ """
+ Verify correct consumer behavior when the consumers in the group are consecutively restarted.
+
+ Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+ - Start a producer which continues producing new messages throughout the test.
+ - Start up the consumers and wait until they've joined the group.
+ - In a loop, restart each consumer, waiting for each one to rejoin the group before
+ restarting the rest.
+ - Verify delivery semantics according to the failure type.
+ """
+ partition = TopicPartition(self.STOPIC, 0)
+
+ producer = self._setup_producer(self.STOPIC)
+ consumer = self._setup_consumer(self.STOPIC)
+
+ producer.start()
+ wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,
+ err_msg="Producer failed waiting for messages to be written")
+
+ consumer.start()
+ self._await_all_members(consumer)
+
+ if bounce_mode == "all":
+ self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
+ else:
+ self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
+
+ consumer.stop_all()
+ if clean_shutdown:
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ # this can only be guaranteed with a clean shutdown
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+ else:
+ # we may have duplicates in a hard failure
+ assert consumer.current_position(partition) <= consumer.total_consumed(), \
+ "Current position greater than the total number of consumed records"
+
+ @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+ def test_consumer_failure(self, clean_shutdown, enable_autocommit):
+ partition = TopicPartition(self.STOPIC, 0)
+
+ consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit)
producer = self._setup_producer(self.STOPIC)
consumer.start()
@@ -88,46 +225,72 @@ class VerifiableConsumerTest(KafkaTest):
# startup the producer and ensure that some records have been written
producer.start()
- wait_until(lambda: producer.num_acked > 1000, timeout_sec=20,
+ wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,
err_msg="Producer failed waiting for messages to be written")
# stop the partition owner and await its shutdown
- consumer.kill_node(partition_owner, clean_shutdown=True)
- wait_until(lambda: len(consumer.joined) == 1, timeout_sec=20,
- err_msg="Timed out waiting for consumer to close")
+ consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
+ wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
+ timeout_sec=self.session_timeout+5, err_msg="Timed out waiting for consumer to close")
# ensure that the remaining consumer does some work after rebalancing
- current_total_records = consumer.total_records
- wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20,
+ current_total_consumed = consumer.total_consumed()
+ wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=10,
err_msg="Timed out waiting for additional records to be consumed after first consumer failed")
- # if the total records consumed matches the current position,
- # we haven't seen any duplicates
- assert consumer.position(partition) == consumer.total_records
- assert consumer.committed(partition) <= consumer.total_records
+ consumer.stop_all()
- def test_broker_failure(self):
+ if clean_shutdown:
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ # this can only be guaranteed with a clean shutdown
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+ else:
+ # we may have duplicates in a hard failure
+ assert consumer.current_position(partition) <= consumer.total_consumed(), \
+ "Current position greater than the total number of consumed records"
+
+ # if autocommit is not turned on, we can also verify the last committed offset
+ if not enable_autocommit:
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+
+ @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+ def test_broker_failure(self, clean_shutdown, enable_autocommit):
partition = TopicPartition(self.STOPIC, 0)
- consumer = self._setup_consumer(self.STOPIC)
+ consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit)
producer = self._setup_producer(self.STOPIC)
producer.start()
consumer.start()
self._await_all_members(consumer)
+ num_rebalances = consumer.num_rebalances()
+
# shutdown one of the brokers
- self.kafka.signal_node(self.kafka.nodes[0])
+ # TODO: we need a way to target the coordinator instead of picking arbitrarily
+ self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
- # ensure that the remaining consumer does some work after broker failure
- current_total_records = consumer.total_records
- wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20,
+ # ensure that the consumers do some work after the broker failure
+ current_total_consumed = consumer.total_consumed()
+ wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=20,
err_msg="Timed out waiting for additional records to be consumed after first consumer failed")
- # if the total records consumed matches the current position,
- # we haven't seen any duplicates
- assert consumer.position(partition) == consumer.total_records
- assert consumer.committed(partition) <= consumer.total_records
+ # verify that there were no rebalances on failover
+ assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
+
+ consumer.stop_all()
+
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ # if autocommit is not turned on, we can also verify the last committed offset
+ if not enable_autocommit:
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
def test_simple_consume(self):
total_records = 1000
@@ -144,14 +307,13 @@ class VerifiableConsumerTest(KafkaTest):
wait_until(lambda: producer.num_acked == total_records, timeout_sec=20,
err_msg="Producer failed waiting for messages to be written")
- wait_until(lambda: consumer.committed(partition) == total_records, timeout_sec=10,
+ wait_until(lambda: consumer.last_commit(partition) == total_records, timeout_sec=10,
err_msg="Consumer failed to read all expected messages")
- assert consumer.position(partition) == total_records
+ assert consumer.current_position(partition) == total_records
def test_valid_assignment(self):
consumer = self._setup_consumer(self.TOPIC)
consumer.start()
self._await_all_members(consumer)
assert self._valid_assignment(consumer.current_assignment())
-