You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/13 02:28:21 UTC

[kafka] branch trunk updated: MINOR: Add a new system test for resilience (#4560)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 962bc63  MINOR: Add a new system test for resilience (#4560)
962bc63 is described below

commit 962bc638f9c2ab249e5008a587ee78e3ba35fcb9
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Feb 12 18:28:18 2018 -0800

    MINOR: Add a new system test for resilience (#4560)
    
    * Rolling kill-restart Streams instances with brokers unavailable temporarily, and validate that the streams can still complete the restart process
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../streams/streams_broker_down_resilience_test.py | 42 ++++++++++++++++++++--
 1 file changed, 40 insertions(+), 2 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 7a0560d..add6247 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -180,10 +180,10 @@ class StreamsBrokerDownResilience(Test):
         processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
         processor_3.start()
 
-        # need to wait for rebalance  once
+        # need to wait for rebalance once
         self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE)
 
-        # assert streams can process when starting with broker down
+        # assert streams can process when starting with broker up
         self.assert_produce_consume("waiting for rebalance to complete", num_messages=9)
 
         message = "processed3messages"
@@ -210,3 +210,41 @@ class StreamsBrokerDownResilience(Test):
 
         self.kafka.stop()
 
+    def test_streams_should_failover_while_brokers_down(self):
+        self.kafka.start()
+
+        configs = self.get_configs(extra_configs=",application.id=failover_with_broker_down")
+
+        processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+        processor.start()
+
+        processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+        processor_2.start()
+
+        processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+        processor_3.start()
+
+        # need to wait for rebalance once
+        self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE)
+
+        # assert streams can process when starting with broker up
+        self.assert_produce_consume("waiting for rebalance to complete", num_messages=9)
+
+        message = "processed3messages"
+
+        self.wait_for_verification(processor, message, processor.STDOUT_FILE)
+        self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE)
+        self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE)
+
+        node = self.kafka.leader(self.inputTopic)
+        self.kafka.stop_node(node)
+
+        processor.abortThenRestart()
+        processor_2.abortThenRestart()
+        processor_3.abortThenRestart()
+
+        self.kafka.start_node(node)
+
+        self.assert_produce_consume("sending_message_after_hard_bouncing_streams_instance_bouncing_broker", num_messages=9)
+
+        self.kafka.stop()

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.