You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/08 01:21:38 UTC

[kafka] branch trunk updated: MINOR: adding system tests for how streams functions with broker faiures (#4513)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6780338  MINOR: adding system tests for how streams functions with broker faiures (#4513)
6780338 is described below

commit 67803384d9b7959661bb2a32129b03a374507c8a
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Feb 7 20:21:35 2018 -0500

    MINOR: adding system tests for how streams functions with broker faiures (#4513)
    
    System test for two cases:
    
    * Starting a multi-node streams application with the broker down initially, broker starts and confirm rebalance completes and streams application still able to process records.
    
    * Multi-node streams app running, broker goes down, stop stream instance(s) confirm after broker comes back remaining streams instance(s) still function.
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../tests/StreamsBrokerDownResilienceTest.java     |  10 +-
 tests/kafkatest/services/streams.py                |  18 +++
 .../streams/streams_broker_down_resilience_test.py | 138 ++++++++++++++++++---
 3 files changed, 143 insertions(+), 23 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index c8462ca..ed4cd27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -78,13 +78,16 @@ public class StreamsBrokerDownResilienceTest {
 
             System.exit(1);
         }
-
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(stringSerde, stringSerde))
             .peek(new ForeachAction<String, String>() {
+                int messagesProcessed = 0;
                 @Override
                 public void apply(String key, String value) {
                     System.out.println("received key " + key + " and value " + value);
+                    messagesProcessed++;
+                    System.out.println("processed" + messagesProcessed + "messages");
+                    System.out.flush();
                 }
             }).to(SINK_TOPIC);
 
@@ -104,8 +107,9 @@ public class StreamsBrokerDownResilienceTest {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                System.out.println("Shutting down streams now");
-                streams.close(10, TimeUnit.SECONDS);
+                streams.close(30, TimeUnit.SECONDS);
+                System.out.println("Complete shutdown of streams resilience test app now");
+                System.out.flush();
             }
         }));
 
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 9c4bd87..0f484b4 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -208,9 +208,27 @@ class StreamsBrokerCompatibilityService(StreamsTestBaseService):
                                                                 "org.apache.kafka.streams.tests.BrokerCompatibilityTest",
                                                                 eosEnabled)
 
+
 class StreamsBrokerDownResilienceService(StreamsTestBaseService):
     def __init__(self, test_context, kafka, configs):
         super(StreamsBrokerDownResilienceService, self).__init__(test_context,
                                                                  kafka,
                                                                  "org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest",
                                                                  configs)
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers(validate=False)
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
+              " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
+              " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        return cmd
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index bd90d9f..7a0560d 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -40,61 +40,76 @@ class StreamsBrokerDownResilience(Test):
                                   num_nodes=1,
                                   zk=self.zk,
                                   topics={
-                                      self.inputTopic: {'partitions': 1, 'replication-factor': 1},
+                                      self.inputTopic: {'partitions': 3, 'replication-factor': 1},
                                       self.outputTopic: {'partitions': 1, 'replication-factor': 1}
                                   })
 
-    def get_consumer(self):
+    def get_consumer(self, num_messages):
         return VerifiableConsumer(self.test_context,
                                   1,
                                   self.kafka,
                                   self.outputTopic,
                                   "stream-broker-resilience-verify-consumer",
-                                  max_messages=self.num_messages)
+                                  max_messages=num_messages)
 
-    def get_producer(self):
+    def get_producer(self, num_messages):
         return VerifiableProducer(self.test_context,
                                   1,
                                   self.kafka,
                                   self.inputTopic,
-                                  max_messages=self.num_messages,
+                                  max_messages=num_messages,
                                   acks=1)
 
-    def assert_produce_consume(self, test_state):
-        producer = self.get_producer()
+    def assert_produce_consume(self, test_state, num_messages=5):
+        producer = self.get_producer(num_messages)
         producer.start()
 
-        wait_until(lambda: producer.num_acked > 0,
+        wait_until(lambda: producer.num_acked >= num_messages,
                    timeout_sec=30,
                    err_msg="At %s failed to send messages " % test_state)
 
-        consumer = self.get_consumer()
+        consumer = self.get_consumer(num_messages)
         consumer.start()
 
-        wait_until(lambda: consumer.total_consumed() > 0,
+        wait_until(lambda: consumer.total_consumed() >= num_messages,
                    timeout_sec=60,
                    err_msg="At %s streams did not process messages in 60 seconds " % test_state)
 
-    def setUp(self):
-        self.zk.start()
-
-    def test_streams_resilient_to_broker_down(self):
-        self.kafka.start()
-
+    @staticmethod
+    def get_configs(extra_configs=""):
         # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
         consumer_poll_ms = "consumer.max.poll.interval.ms=50000"
         retries_config = "producer.retries=2"
         request_timeout = "producer.request.timeout.ms=15000"
         max_block_ms = "producer.max.block.ms=30000"
 
+        # java code expects configs in key=value,key=value format
+        updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms + extra_configs
+
+        return updated_configs
+
+    def wait_for_verification(self, processor, message, file, num_lines=1):
+        wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
+                   timeout_sec=60,
+                   err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))
+
+    @staticmethod
+    def verify_from_file(processor, message, file):
+        result = processor.node.account.ssh_output("grep '%s' %s | wc -l" % (message, file), allow_fail=False)
+        return int(result)
+
+
+    def setUp(self):
+        self.zk.start()
+
+    def test_streams_resilient_to_broker_down(self):
+        self.kafka.start()
+
         # Broker should be down over 2x of retries * timeout ms
         # So with (2 * 15000) = 30 seconds, we'll set downtime to 70 seconds
         broker_down_time_in_seconds = 70
 
-        # java code expects configs in key=value,key=value format
-        updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms
-
-        processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, updated_configs)
+        processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs())
         processor.start()
 
         # until KIP-91 is merged we'll only send 5 messages to assert Kafka Streams is running before taking the broker down
@@ -112,3 +127,86 @@ class StreamsBrokerDownResilience(Test):
         self.assert_produce_consume("after_broker_stop")
 
         self.kafka.stop()
+
+    def test_streams_runs_with_broker_down_initially(self):
+        self.kafka.start()
+        node = self.kafka.leader(self.inputTopic)
+        self.kafka.stop_node(node)
+
+        configs = self.get_configs(extra_configs=",application.id=starting_wo_broker_id")
+
+        # start streams with broker down initially
+        processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+        processor.start()
+
+        processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+        processor_2.start()
+
+        processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+        processor_3.start()
+
+        broker_unavailable_message = "Broker may not be available"
+
+        # verify streams instances unable to connect to broker, kept trying
+        self.wait_for_verification(processor, broker_unavailable_message, processor.LOG_FILE, 100)
+        self.wait_for_verification(processor_2, broker_unavailable_message, processor_2.LOG_FILE, 100)
+        self.wait_for_verification(processor_3, broker_unavailable_message, processor_3.LOG_FILE, 100)
+
+        # now start broker
+        self.kafka.start_node(node)
+
+        # assert streams can process when starting with broker down
+        self.assert_produce_consume("running_with_broker_down_initially", num_messages=9)
+
+        message = "processed3messages"
+        # need to show all 3 instances processed messages
+        self.wait_for_verification(processor, message, processor.STDOUT_FILE)
+        self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE)
+        self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE)
+
+        self.kafka.stop()
+
+    def test_streams_should_scale_in_while_brokers_down(self):
+        self.kafka.start()
+
+        configs = self.get_configs(extra_configs=",application.id=shutdown_with_broker_down")
+
+        processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+        processor.start()
+
+        processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+        processor_2.start()
+
+        processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+        processor_3.start()
+
+        # need to wait for rebalance  once
+        self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE)
+
+        # assert streams can process when starting with broker down
+        self.assert_produce_consume("waiting for rebalance to complete", num_messages=9)
+
+        message = "processed3messages"
+
+        self.wait_for_verification(processor, message, processor.STDOUT_FILE)
+        self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE)
+        self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE)
+
+        node = self.kafka.leader(self.inputTopic)
+        self.kafka.stop_node(node)
+
+        processor.stop()
+        processor_2.stop()
+
+        shutdown_message = "Complete shutdown of streams resilience test app now"
+        self.wait_for_verification(processor, shutdown_message, processor.STDOUT_FILE)
+        self.wait_for_verification(processor_2, shutdown_message, processor_2.STDOUT_FILE)
+
+        self.kafka.start_node(node)
+
+        self.assert_produce_consume("sending_message_after_stopping_streams_instance_bouncing_broker", num_messages=9)
+
+        self.wait_for_verification(processor_3, "processed9messages", processor_3.STDOUT_FILE)
+
+        self.kafka.stop()
+

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.