You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/12/20 01:53:19 UTC

[kafka] branch trunk updated: MINOR: Stabilization fixes broker down test trunk (#6043)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 40ca7dd  MINOR: Stabilization fixes broker down test trunk (#6043)
40ca7dd is described below

commit 40ca7ddeedaee6edf39ca941b13a79ad42c3e5b5
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Dec 19 20:53:09 2018 -0500

    MINOR: Stabilization fixes broker down test trunk (#6043)
    
    This PR addresses a few issues with this system test flakiness. This PR is a cherry-picked duplicate of #6041 but for the trunk branch, hence I won't repeat the inline comments here.
    
    1. Need to grab the monitor before a given operation to observe logs for signal
    2. Relied too much on a timely rebalance and only sent a handful of messages.
    I've updated the test and ran it here https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2143/ parameterized for 15 repeats all passed.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 tests/kafkatest/tests/streams/base_streams_test.py |   1 +
 .../streams/streams_broker_down_resilience_test.py | 222 +++++++++++++++------
 2 files changed, 158 insertions(+), 65 deletions(-)

diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py
index 320d4b2..6e005dd 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -45,6 +45,7 @@ class BaseStreamsTest(KafkaTest):
                                   topic,
                                   max_messages=num_messages,
                                   acks=1,
+                                  throughput=1000,
                                   repeating_keys=repeating_keys)
 
     def assert_produce_consume(self,
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 3cbf713..ee5feae 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -27,7 +27,9 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
     inputTopic = "streamsResilienceSource"
     outputTopic = "streamsResilienceSink"
     client_id = "streams-broker-resilience-verify-consumer"
-    num_messages = 5
+    num_messages = 10000
+    message = "processed[0-9]*messages"
+    connected_message = "Discovered group coordinator"
 
     def __init__(self, test_context):
         super(StreamsBrokerDownResilience, self).__init__(test_context,
@@ -48,8 +50,6 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs())
         processor.start()
 
-        # until KIP-91 is merged we'll only send 5 messages to assert Kafka Streams is running before taking the broker down
-        # After KIP-91 is merged we'll continue to send messages the duration of the test
         self.assert_produce_consume(self.inputTopic,
                                     self.outputTopic,
                                     self.client_id,
@@ -61,7 +61,11 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
 
         time.sleep(broker_down_time_in_seconds)
 
-        self.kafka.start_node(node)
+        with processor.node.account.monitor_log(processor.LOG_FILE) as monitor:
+            self.kafka.start_node(node)
+            monitor.wait_until(self.connected_message,
+                               timeout_sec=120,
+                               err_msg=("Never saw output '%s' on " % self.connected_message) + str(processor.node.account))
 
         self.assert_produce_consume(self.inputTopic,
                                     self.outputTopic,
@@ -95,22 +99,45 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.wait_for_verification(processor_2, broker_unavailable_message, processor_2.LOG_FILE, 10)
         self.wait_for_verification(processor_3, broker_unavailable_message, processor_3.LOG_FILE, 10)
 
-        # now start broker
-        self.kafka.start_node(node)
-
-        # assert streams can process when starting with broker down
-        self.assert_produce_consume(self.inputTopic,
-                                    self.outputTopic,
-                                    self.client_id,
-                                    "running_with_broker_down_initially",
-                                    num_messages=9,
-                                    timeout_sec=120)
-
-        message = "processed3messages"
-        # need to show all 3 instances processed messages
-        self.wait_for_verification(processor, message, processor.STDOUT_FILE)
-        self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE)
-        self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE)
+        with processor.node.account.monitor_log(processor.LOG_FILE) as monitor_1:
+            with processor_2.node.account.monitor_log(processor_2.LOG_FILE) as monitor_2:
+                with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor_3:
+                    self.kafka.start_node(node)
+
+                    monitor_1.wait_until(self.connected_message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.connected_message) + str(processor.node.account))
+                    monitor_2.wait_until(self.connected_message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_2.node.account))
+                    monitor_3.wait_until(self.connected_message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_3.node.account))
+
+        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor_1:
+            with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2:
+                with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+
+                    self.assert_produce(self.inputTopic,
+                                        "sending_message_after_broker_down_initially",
+                                        num_messages=self.num_messages,
+                                        timeout_sec=120)
+
+                    monitor_1.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor.node.account))
+                    monitor_2.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor_2.node.account))
+                    monitor_3.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor_3.node.account))
+
+                    self.assert_consume(self.client_id,
+                                        "consuming_message_after_broker_down_initially",
+                                        self.outputTopic,
+                                        num_messages=self.num_messages,
+                                        timeout_sec=120)
 
         self.kafka.stop()
 
@@ -126,24 +153,40 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         processor_2.start()
 
         processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
-        processor_3.start()
 
         # need to wait for rebalance once
-        self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE)
-
-        # assert streams can process when starting with broker up
-        self.assert_produce_consume(self.inputTopic,
-                                    self.outputTopic,
-                                    self.client_id,
-                                    "waiting for rebalance to complete",
-                                    num_messages=9,
-                                    timeout_sec=120)
-
-        message = "processed3messages"
-
-        self.wait_for_verification(processor, message, processor.STDOUT_FILE)
-        self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE)
-        self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE)
+        rebalance = "State transition from REBALANCING to RUNNING"
+        with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor:
+            processor_3.start()
+
+            monitor.wait_until(rebalance,
+                               timeout_sec=120,
+                               err_msg=("Never saw output '%s' on " % rebalance) + str(processor_3.node.account))
+
+        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor_1:
+            with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2:
+                with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+
+                    self.assert_produce(self.inputTopic,
+                                        "sending_message_normal_broker_start",
+                                        num_messages=self.num_messages,
+                                        timeout_sec=120)
+
+                    monitor_1.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor.node.account))
+                    monitor_2.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor_2.node.account))
+                    monitor_3.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor_3.node.account))
+
+                    self.assert_consume(self.client_id,
+                                        "consuming_message_normal_broker_start",
+                                        self.outputTopic,
+                                        num_messages=self.num_messages,
+                                        timeout_sec=120)
 
         node = self.kafka.leader(self.inputTopic)
         self.kafka.stop_node(node)
@@ -155,17 +198,20 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.wait_for_verification(processor, shutdown_message, processor.STDOUT_FILE)
         self.wait_for_verification(processor_2, shutdown_message, processor_2.STDOUT_FILE)
 
-        self.kafka.start_node(node)
+        with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor_3:
+            self.kafka.start_node(node)
+
+            monitor_3.wait_until(self.connected_message,
+                                 timeout_sec=120,
+                                 err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_3.node.account))
 
         self.assert_produce_consume(self.inputTopic,
                                     self.outputTopic,
                                     self.client_id,
                                     "sending_message_after_stopping_streams_instance_bouncing_broker",
-                                    num_messages=9,
+                                    num_messages=self.num_messages,
                                     timeout_sec=120)
 
-        self.wait_for_verification(processor_3, "processed9messages", processor_3.STDOUT_FILE)
-
         self.kafka.stop()
 
     def test_streams_should_failover_while_brokers_down(self):
@@ -180,24 +226,40 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         processor_2.start()
 
         processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
-        processor_3.start()
 
         # need to wait for rebalance once
-        self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE)
-
-        # assert streams can process when starting with broker up
-        self.assert_produce_consume(self.inputTopic,
-                                    self.outputTopic,
-                                    self.client_id,
-                                    "waiting for rebalance to complete",
-                                    num_messages=9,
-                                    timeout_sec=120)
-
-        message = "processed3messages"
-
-        self.wait_for_verification(processor, message, processor.STDOUT_FILE)
-        self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE)
-        self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE)
+        rebalance = "State transition from REBALANCING to RUNNING"
+        with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor:
+            processor_3.start()
+
+            monitor.wait_until(rebalance,
+                               timeout_sec=120,
+                               err_msg=("Never saw output '%s' on " % rebalance) + str(processor_3.node.account))
+
+        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor_1:
+            with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2:
+                with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+
+                    self.assert_produce(self.inputTopic,
+                                        "sending_message_after_normal_broker_start",
+                                        num_messages=self.num_messages,
+                                        timeout_sec=120)
+
+                    monitor_1.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor.node.account))
+                    monitor_2.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor_2.node.account))
+                    monitor_3.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor_3.node.account))
+
+                    self.assert_consume(self.client_id,
+                                        "consuming_message_after_normal_broker_start",
+                                        self.outputTopic,
+                                        num_messages=self.num_messages,
+                                        timeout_sec=120)
 
         node = self.kafka.leader(self.inputTopic)
         self.kafka.stop_node(node)
@@ -206,13 +268,43 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         processor_2.abortThenRestart()
         processor_3.abortThenRestart()
 
-        self.kafka.start_node(node)
-
-        self.assert_produce_consume(self.inputTopic,
-                                    self.outputTopic,
-                                    self.client_id,
-                                    "sending_message_after_hard_bouncing_streams_instance_bouncing_broker",
-                                    num_messages=9,
-                                    timeout_sec=120)
-
+        with processor.node.account.monitor_log(processor.LOG_FILE) as monitor_1:
+            with processor_2.node.account.monitor_log(processor_2.LOG_FILE) as monitor_2:
+                with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor_3:
+                    self.kafka.start_node(node)
+
+                    monitor_1.wait_until(self.connected_message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.connected_message) + str(processor.node.account))
+                    monitor_2.wait_until(self.connected_message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_2.node.account))
+                    monitor_3.wait_until(self.connected_message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_3.node.account))
+
+        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor_1:
+            with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2:
+                with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+
+                    self.assert_produce(self.inputTopic,
+                                        "sending_message_after_hard_bouncing_streams_instance_bouncing_broker",
+                                        num_messages=self.num_messages,
+                                        timeout_sec=120)
+
+                    monitor_1.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor.node.account))
+                    monitor_2.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor_2.node.account))
+                    monitor_3.wait_until(self.message,
+                                         timeout_sec=120,
+                                         err_msg=("Never saw '%s' on " % self.message) + str(processor_3.node.account))
+
+                    self.assert_consume(self.client_id,
+                                        "consuming_message_after_stopping_streams_instance_bouncing_broker",
+                                        self.outputTopic,
+                                        num_messages=self.num_messages,
+                                        timeout_sec=120)
         self.kafka.stop()