You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/13 02:28:21 UTC
[kafka] branch trunk updated: MINOR: Add a new system test for
resilience (#4560)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 962bc63 MINOR: Add a new system test for resilience (#4560)
962bc63 is described below
commit 962bc638f9c2ab249e5008a587ee78e3ba35fcb9
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Feb 12 18:28:18 2018 -0800
MINOR: Add a new system test for resilience (#4560)
* Rolling kill-restart Streams instances with brokers unavailable temporarily, and validate that the streams can still complete the restart process
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../streams/streams_broker_down_resilience_test.py | 42 ++++++++++++++++++++--
1 file changed, 40 insertions(+), 2 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 7a0560d..add6247 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -180,10 +180,10 @@ class StreamsBrokerDownResilience(Test):
processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor_3.start()
- # need to wait for rebalance once
+ # need to wait for rebalance once
self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE)
- # assert streams can process when starting with broker down
+ # assert streams can process when starting with broker up
self.assert_produce_consume("waiting for rebalance to complete", num_messages=9)
message = "processed3messages"
@@ -210,3 +210,41 @@ class StreamsBrokerDownResilience(Test):
self.kafka.stop()
+ def test_streams_should_failover_while_brokers_down(self):
+ self.kafka.start()
+
+ configs = self.get_configs(extra_configs=",application.id=failover_with_broker_down")
+
+ processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+ processor.start()
+
+ processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+ processor_2.start()
+
+ processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+ processor_3.start()
+
+ # need to wait for rebalance once
+ self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE)
+
+ # assert streams can process when starting with broker up
+ self.assert_produce_consume("waiting for rebalance to complete", num_messages=9)
+
+ message = "processed3messages"
+
+ self.wait_for_verification(processor, message, processor.STDOUT_FILE)
+ self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE)
+ self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE)
+
+ node = self.kafka.leader(self.inputTopic)
+ self.kafka.stop_node(node)
+
+ processor.abortThenRestart()
+ processor_2.abortThenRestart()
+ processor_3.abortThenRestart()
+
+ self.kafka.start_node(node)
+
+ self.assert_produce_consume("sending_message_after_hard_bouncing_streams_instance_bouncing_broker", num_messages=9)
+
+ self.kafka.stop()
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.