You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/08 01:21:38 UTC
[kafka] branch trunk updated: MINOR: adding system tests for how
streams functions with broker faiures (#4513)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6780338 MINOR: adding system tests for how streams functions with broker faiures (#4513)
6780338 is described below
commit 67803384d9b7959661bb2a32129b03a374507c8a
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Feb 7 20:21:35 2018 -0500
MINOR: adding system tests for how streams functions with broker faiures (#4513)
System test for two cases:
* Starting a multi-node streams application with the broker down initially, broker starts and confirm rebalance completes and streams application still able to process records.
* Multi-node streams app running, broker goes down, stop stream instance(s) confirm after broker comes back remaining streams instance(s) still function.
Reviewers: Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../tests/StreamsBrokerDownResilienceTest.java | 10 +-
tests/kafkatest/services/streams.py | 18 +++
.../streams/streams_broker_down_resilience_test.py | 138 ++++++++++++++++++---
3 files changed, 143 insertions(+), 23 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index c8462ca..ed4cd27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -78,13 +78,16 @@ public class StreamsBrokerDownResilienceTest {
System.exit(1);
}
-
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(stringSerde, stringSerde))
.peek(new ForeachAction<String, String>() {
+ int messagesProcessed = 0;
@Override
public void apply(String key, String value) {
System.out.println("received key " + key + " and value " + value);
+ messagesProcessed++;
+ System.out.println("processed" + messagesProcessed + "messages");
+ System.out.flush();
}
}).to(SINK_TOPIC);
@@ -104,8 +107,9 @@ public class StreamsBrokerDownResilienceTest {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- System.out.println("Shutting down streams now");
- streams.close(10, TimeUnit.SECONDS);
+ streams.close(30, TimeUnit.SECONDS);
+ System.out.println("Complete shutdown of streams resilience test app now");
+ System.out.flush();
}
}));
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 9c4bd87..0f484b4 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -208,9 +208,27 @@ class StreamsBrokerCompatibilityService(StreamsTestBaseService):
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
eosEnabled)
+
class StreamsBrokerDownResilienceService(StreamsTestBaseService):
def __init__(self, test_context, kafka, configs):
super(StreamsBrokerDownResilienceService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest",
configs)
+
+ def start_cmd(self, node):
+ args = self.args.copy()
+ args['kafka'] = self.kafka.bootstrap_servers(validate=False)
+ args['state_dir'] = self.PERSISTENT_ROOT
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+ cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
+ " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
+ " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+ return cmd
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index bd90d9f..7a0560d 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -40,61 +40,76 @@ class StreamsBrokerDownResilience(Test):
num_nodes=1,
zk=self.zk,
topics={
- self.inputTopic: {'partitions': 1, 'replication-factor': 1},
+ self.inputTopic: {'partitions': 3, 'replication-factor': 1},
self.outputTopic: {'partitions': 1, 'replication-factor': 1}
})
- def get_consumer(self):
+ def get_consumer(self, num_messages):
return VerifiableConsumer(self.test_context,
1,
self.kafka,
self.outputTopic,
"stream-broker-resilience-verify-consumer",
- max_messages=self.num_messages)
+ max_messages=num_messages)
- def get_producer(self):
+ def get_producer(self, num_messages):
return VerifiableProducer(self.test_context,
1,
self.kafka,
self.inputTopic,
- max_messages=self.num_messages,
+ max_messages=num_messages,
acks=1)
- def assert_produce_consume(self, test_state):
- producer = self.get_producer()
+ def assert_produce_consume(self, test_state, num_messages=5):
+ producer = self.get_producer(num_messages)
producer.start()
- wait_until(lambda: producer.num_acked > 0,
+ wait_until(lambda: producer.num_acked >= num_messages,
timeout_sec=30,
err_msg="At %s failed to send messages " % test_state)
- consumer = self.get_consumer()
+ consumer = self.get_consumer(num_messages)
consumer.start()
- wait_until(lambda: consumer.total_consumed() > 0,
+ wait_until(lambda: consumer.total_consumed() >= num_messages,
timeout_sec=60,
err_msg="At %s streams did not process messages in 60 seconds " % test_state)
- def setUp(self):
- self.zk.start()
-
- def test_streams_resilient_to_broker_down(self):
- self.kafka.start()
-
+ @staticmethod
+ def get_configs(extra_configs=""):
# Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
consumer_poll_ms = "consumer.max.poll.interval.ms=50000"
retries_config = "producer.retries=2"
request_timeout = "producer.request.timeout.ms=15000"
max_block_ms = "producer.max.block.ms=30000"
+ # java code expects configs in key=value,key=value format
+ updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms + extra_configs
+
+ return updated_configs
+
+ def wait_for_verification(self, processor, message, file, num_lines=1):
+ wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
+ timeout_sec=60,
+ err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))
+
+ @staticmethod
+ def verify_from_file(processor, message, file):
+ result = processor.node.account.ssh_output("grep '%s' %s | wc -l" % (message, file), allow_fail=False)
+ return int(result)
+
+
+ def setUp(self):
+ self.zk.start()
+
+ def test_streams_resilient_to_broker_down(self):
+ self.kafka.start()
+
# Broker should be down over 2x of retries * timeout ms
# So with (2 * 15000) = 30 seconds, we'll set downtime to 70 seconds
broker_down_time_in_seconds = 70
- # java code expects configs in key=value,key=value format
- updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms
-
- processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, updated_configs)
+ processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs())
processor.start()
# until KIP-91 is merged we'll only send 5 messages to assert Kafka Streams is running before taking the broker down
@@ -112,3 +127,86 @@ class StreamsBrokerDownResilience(Test):
self.assert_produce_consume("after_broker_stop")
self.kafka.stop()
+
+ def test_streams_runs_with_broker_down_initially(self):
+ self.kafka.start()
+ node = self.kafka.leader(self.inputTopic)
+ self.kafka.stop_node(node)
+
+ configs = self.get_configs(extra_configs=",application.id=starting_wo_broker_id")
+
+ # start streams with broker down initially
+ processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+ processor.start()
+
+ processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+ processor_2.start()
+
+ processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+ processor_3.start()
+
+ broker_unavailable_message = "Broker may not be available"
+
+ # verify streams instances unable to connect to broker, kept trying
+ self.wait_for_verification(processor, broker_unavailable_message, processor.LOG_FILE, 100)
+ self.wait_for_verification(processor_2, broker_unavailable_message, processor_2.LOG_FILE, 100)
+ self.wait_for_verification(processor_3, broker_unavailable_message, processor_3.LOG_FILE, 100)
+
+ # now start broker
+ self.kafka.start_node(node)
+
+ # assert streams can process when starting with broker down
+ self.assert_produce_consume("running_with_broker_down_initially", num_messages=9)
+
+ message = "processed3messages"
+ # need to show all 3 instances processed messages
+ self.wait_for_verification(processor, message, processor.STDOUT_FILE)
+ self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE)
+ self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE)
+
+ self.kafka.stop()
+
+ def test_streams_should_scale_in_while_brokers_down(self):
+ self.kafka.start()
+
+ configs = self.get_configs(extra_configs=",application.id=shutdown_with_broker_down")
+
+ processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+ processor.start()
+
+ processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+ processor_2.start()
+
+ processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
+ processor_3.start()
+
+ # need to wait for rebalance once
+ self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE)
+
+ # assert streams can process when starting with broker down
+ self.assert_produce_consume("waiting for rebalance to complete", num_messages=9)
+
+ message = "processed3messages"
+
+ self.wait_for_verification(processor, message, processor.STDOUT_FILE)
+ self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE)
+ self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE)
+
+ node = self.kafka.leader(self.inputTopic)
+ self.kafka.stop_node(node)
+
+ processor.stop()
+ processor_2.stop()
+
+ shutdown_message = "Complete shutdown of streams resilience test app now"
+ self.wait_for_verification(processor, shutdown_message, processor.STDOUT_FILE)
+ self.wait_for_verification(processor_2, shutdown_message, processor_2.STDOUT_FILE)
+
+ self.kafka.start_node(node)
+
+ self.assert_produce_consume("sending_message_after_stopping_streams_instance_bouncing_broker", num_messages=9)
+
+ self.wait_for_verification(processor_3, "processed9messages", processor_3.STDOUT_FILE)
+
+ self.kafka.stop()
+
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.