You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/05 19:06:40 UTC
[kafka] branch trunk updated: MINOR: Add System test for standby
task-rebalancing (#4554)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8a7d7e7 MINOR: Add System test for standby task-rebalancing (#4554)
8a7d7e7 is described below
commit 8a7d7e7955889125b1196caeded6fa57d93a0b46
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Mon Mar 5 14:06:32 2018 -0500
MINOR: Add System test for standby task-rebalancing (#4554)
Author: Bill Bejeck <bi...@confluent.io>
Reviewers: Damian Guy <da...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../tests/StreamsRepeatingIntegerKeyProducer.java | 108 ++++++++++++++
.../streams/tests/StreamsStandByReplicaTest.java | 162 ++++++++++++++++++++
.../apache/kafka/streams/tests/SystemTestUtil.java | 62 ++++++++
.../kafka/streams/tests/SystemTestUtilTest.java | 75 ++++++++++
tests/kafkatest/services/streams.py | 17 ++-
.../tests/streams/streams_standby_replica_test.py | 166 +++++++++++++++++++++
6 files changed, 589 insertions(+), 1 deletion(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
new file mode 100644
index 0000000..85ca077
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class used to send messages with integer keys
+ * repeating in sequence every 1000 messages. Multiple topics for publishing
+ * can be provided in the config map with key of 'topics' and ';' delimited list of output topics
+ */
+public class StreamsRepeatingIntegerKeyProducer {
+
+ private static volatile boolean keepProducing = true;
+ private volatile static int messageCounter = 0;
+
+ public static void main(String[] args) {
+ System.out.println("StreamsTest instance started");
+
+ final String kafka = args.length > 0 ? args[0] : "localhost:9092";
+ final String configString = args.length > 2 ? args[2] : null;
+
+ final Map<String, String> configs = SystemTestUtil.parseConfigs(configString);
+ System.out.println("Using provided configs " + configs);
+
+ final int numMessages = configs.containsKey("num_messages") ? Integer.parseInt(configs.get("num_messages")) : 1000;
+
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "StreamsRepeatingIntegerKeyProducer");
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+
+ final String value = "testingValue";
+ Integer key = 0;
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ keepProducing = false;
+ }
+ }));
+
+ final String[] topics = configs.get("topics").split(";");
+ final int totalMessagesToProduce = numMessages * topics.length;
+
+ try (final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(producerProps)) {
+
+ while (keepProducing && messageCounter < totalMessagesToProduce) {
+ for (final String topic : topics) {
+ final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key.toString(), value + key);
+ kafkaProducer.send(producerRecord, new Callback() {
+ @Override
+ public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+ if (exception != null) {
+ exception.printStackTrace(System.err);
+ System.err.flush();
+ if (exception instanceof TimeoutException) {
+ try {
+ // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
+ final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
+ messageCounter -= expired;
+ } catch (final Exception ignore) {
+ }
+ }
+ }
+ }
+ });
+ messageCounter += 1;
+ }
+ key += 1;
+ if (key % 1000 == 0) {
+ System.out.println("Sent 1000 messages");
+ Utils.sleep(100);
+ key = 0;
+ }
+ }
+ }
+ System.out.println("Producer shut down now, sent total " + messageCounter + " of requested " + totalMessagesToProduce);
+ System.out.flush();
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
new file mode 100644
index 0000000..0e3aa13
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StreamsStandByReplicaTest {
+
+ public static void main(String[] args) {
+
+ System.out.println("StreamsTest instance started");
+
+ final String kafka = args.length > 0 ? args[0] : "localhost:9092";
+ final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
+ final String additionalConfigs = args.length > 2 ? args[2] : null;
+
+ final Serde<String> stringSerde = Serdes.String();
+
+ final Properties streamsProperties = new Properties();
+ streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
+ streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, stateDirStr);
+ streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+ streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
+
+ final Map<String, String> updated = SystemTestUtil.parseConfigs(additionalConfigs);
+ System.out.println("Updating configs with " + updated);
+
+ final String sourceTopic = updated.remove("sourceTopic");
+ final String sinkTopic1 = updated.remove("sinkTopic1");
+ final String sinkTopic2 = updated.remove("sinkTopic2");
+
+ if (sourceTopic == null || sinkTopic1 == null || sinkTopic2 == null) {
+ System.err.println(String.format("one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]", sourceTopic, sinkTopic1, sinkTopic2));
+ System.err.flush();
+ System.exit(1);
+ }
+
+ streamsProperties.putAll(updated);
+
+ if (!confirmCorrectConfigs(streamsProperties)) {
+ System.err.println(String.format("ERROR: Did not have all required configs expected to contain %s, %s, %s, %s",
+ StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
+ StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG),
+ StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+ StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)));
+
+ System.exit(1);
+ }
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ String inMemoryStoreName = "in-memory-store";
+ String persistentMemoryStoreName = "persistent-memory-store";
+
+ KeyValueBytesStoreSupplier inMemoryStoreSupplier = Stores.inMemoryKeyValueStore(inMemoryStoreName);
+ KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentKeyValueStore(persistentMemoryStoreName);
+
+ KStream<String, String> inputStream = builder.stream(sourceTopic, Consumed.with(stringSerde, stringSerde));
+
+ ValueMapper<Long, String> countMapper = new ValueMapper<Long, String>() {
+ @Override
+ public String apply(final Long value) {
+ return value.toString();
+ }
+ };
+
+ inputStream.groupByKey().count(Materialized.<String, Long>as(inMemoryStoreSupplier)).toStream().mapValues(countMapper)
+ .to(sinkTopic1, Produced.with(stringSerde, stringSerde));
+
+ inputStream.groupByKey().count(Materialized.<String, Long>as(persistentStoreSupplier)).toStream().mapValues(countMapper)
+ .to(sinkTopic2, Produced.with(stringSerde, stringSerde));
+
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
+
+ streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ System.err.println("FATAL: An unexpected exception " + e);
+ e.printStackTrace(System.err);
+ System.err.flush();
+ shutdown(streams);
+ }
+ });
+
+ streams.setStateListener(new KafkaStreams.StateListener() {
+ @Override
+ public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
+ if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+ final Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
+ for (final ThreadMetadata threadMetadatum : threadMetadata) {
+ System.out.println("ACTIVE_TASKS:" + threadMetadatum.activeTasks().size() + " STANDBY_TASKS:" + threadMetadatum.standbyTasks().size());
+ }
+ }
+ }
+ });
+
+ System.out.println("Start Kafka Streams");
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ shutdown(streams);
+ System.out.println("Shut down streams now");
+ }
+ }));
+
+
+ }
+
+ private static void shutdown(final KafkaStreams streams) {
+ streams.close(10, TimeUnit.SECONDS);
+ }
+
+ private static boolean confirmCorrectConfigs(final Properties properties) {
+ return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) &&
+ properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) &&
+ properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG)) &&
+ properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG));
+ }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtil.java
new file mode 100644
index 0000000..4ddbf69
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Class for common convenience methods for working on
+ * System tests
+ */
+
+public class SystemTestUtil {
+
+ private static final int KEY = 0;
+ private static final int VALUE = 1;
+
+ /**
+ * Takes a string with keys and values separated by '=' and each key value pair
+ * separated by ',' for example max.block.ms=5000,retries=6,request.timeout.ms=6000
+ *
+ * This class makes it easier to pass configs from the system test in python to the Java test.
+ *
+ * @param formattedConfigs the formatted config string
+ * @return HashMap with keys and values inserted
+ */
+ public static Map<String, String> parseConfigs(final String formattedConfigs) {
+ Objects.requireNonNull(formattedConfigs, "Formatted config String can't be null");
+
+ if (formattedConfigs.indexOf('=') == -1) {
+ throw new IllegalStateException(String.format("Provided string [ %s ] does not have expected key-value separator of '='", formattedConfigs));
+ }
+
+ final String[] parts = formattedConfigs.split(",");
+ final Map<String, String> configs = new HashMap<>();
+ for (final String part : parts) {
+ final String[] keyValue = part.split("=");
+ if (keyValue.length > 2) {
+ throw new IllegalStateException(
+ String.format("Provided string [ %s ] does not have expected key-value pair separator of ','", formattedConfigs));
+ }
+ configs.put(keyValue[KEY], keyValue[VALUE]);
+ }
+ return configs;
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
new file mode 100644
index 0000000..f55450d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class SystemTestUtilTest {
+
+ private final Map<String, String> expectedParsedMap = new TreeMap<>();
+
+ @Before
+ public void setUp() {
+ expectedParsedMap.put("foo", "foo1");
+ expectedParsedMap.put("bar", "bar1");
+ expectedParsedMap.put("baz", "baz1");
+ }
+
+ @Test
+ public void shouldParseCorrectMap() {
+ final String formattedConfigs = "foo=foo1,bar=bar1,baz=baz1";
+ final Map<String, String> parsedMap = SystemTestUtil.parseConfigs(formattedConfigs);
+ final TreeMap<String, String> sortedParsedMap = new TreeMap<>(parsedMap);
+ assertEquals(sortedParsedMap, expectedParsedMap);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowExceptionOnNull() {
+ SystemTestUtil.parseConfigs(null);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldThrowExceptionIfNotCorrectKeyValueSeparator() {
+ final String badString = "foo:bar,baz:boo";
+ SystemTestUtil.parseConfigs(badString);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldThrowExceptionIfNotCorrectKeyValuePairSeparator() {
+ final String badString = "foo=bar;baz=boo";
+ SystemTestUtil.parseConfigs(badString);
+ }
+
+ @Test
+ public void shouldParseSingleKeyValuePairString() {
+ final Map<String, String> expectedSinglePairMap = new HashMap<>();
+ expectedSinglePairMap.put("foo", "bar");
+ final String singleValueString = "foo=bar";
+ final Map<String, String> parsedMap = SystemTestUtil.parseConfigs(singleValueString);
+ assertEquals(expectedSinglePairMap, parsedMap);
+ }
+
+
+}
\ No newline at end of file
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 9df87ca..e552a39 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -18,7 +18,6 @@ import signal
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
-
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@@ -236,3 +235,19 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService):
" %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
return cmd
+
+
+class StreamsStandbyTaskService(StreamsTestBaseService):
+ def __init__(self, test_context, kafka, configs):
+ super(StreamsStandbyTaskService, self).__init__(test_context,
+ kafka,
+ "org.apache.kafka.streams.tests.StreamsStandByReplicaTest",
+ configs)
+
+
+class StreamsRepeatingIntegerKeyProducerService(StreamsTestBaseService):
+ def __init__(self, test_context, kafka, configs):
+ super(StreamsRepeatingIntegerKeyProducerService, self).__init__(test_context,
+ kafka,
+ "org.apache.kafka.streams.tests.StreamsRepeatingIntegerKeyProducer",
+ configs)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
new file mode 100644
index 0000000..533d4b5
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -0,0 +1,166 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsRepeatingIntegerKeyProducerService
+from kafkatest.services.streams import StreamsStandbyTaskService
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class StreamsStandbyTask(Test):
+ """
+ This test validates using standby tasks helps with rebalance times
+ additionally verifies standby replicas continue to work in the
+ face of continual changes to streams code base
+ """
+
+ streams_source_topic = "standbyTaskSource1"
+ streams_sink_topic_1 = "standbyTaskSink1"
+ streams_sink_topic_2 = "standbyTaskSink2"
+
+ num_messages = 60000
+
+ def __init__(self, test_context):
+ super(StreamsStandbyTask, self).__init__(test_context=test_context)
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context,
+ num_nodes=3,
+ zk=self.zk,
+ topics={
+ self.streams_source_topic: {'partitions': 6, 'replication-factor': 1},
+ self.streams_sink_topic_1: {'partitions': 1, 'replication-factor': 1},
+ self.streams_sink_topic_2: {'partitions': 1, 'replication-factor': 1}
+ })
+
+ def get_consumer(self, topic, num_messages):
+ return VerifiableConsumer(self.test_context,
+ 1,
+ self.kafka,
+ topic,
+ "stream-broker-resilience-verify-consumer",
+ max_messages=num_messages)
+
+ def assert_consume(self, test_state, topic, num_messages=5):
+ consumer = self.get_consumer(topic, num_messages)
+ consumer.start()
+
+ wait_until(lambda: consumer.total_consumed() >= num_messages,
+ timeout_sec=120,
+ err_msg="At %s streams did not process messages in 60 seconds " % test_state)
+
+ @staticmethod
+ def get_configs(extra_configs=""):
+ # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
+ consumer_poll_ms = "consumer.max.poll.interval.ms=50000"
+ retries_config = "producer.retries=2"
+ request_timeout = "producer.request.timeout.ms=15000"
+ max_block_ms = "producer.max.block.ms=30000"
+
+ # java code expects configs in key=value,key=value format
+ updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms + extra_configs
+
+ return updated_configs
+
+ def wait_for_verification(self, processor, message, file, num_lines=1, timeout_sec=20):
+ wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
+ timeout_sec=timeout_sec,
+ err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))
+
+ @staticmethod
+ def verify_from_file(processor, message, file):
+ result = processor.node.account.ssh_output("grep '%s' %s | wc -l" % (message, file), allow_fail=False)
+ return int(result)
+
+
+ def setUp(self):
+ self.zk.start()
+ self.kafka.start()
+
+ def test_standby_tasks_rebalance(self):
+
+ driver_configs = "num_messages=%s,topics=%s" % (str(self.num_messages), self.streams_source_topic)
+
+ driver = StreamsRepeatingIntegerKeyProducerService(self.test_context, self.kafka, driver_configs)
+ driver.start()
+
+ configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic,
+ self.streams_sink_topic_1,
+ self.streams_sink_topic_2))
+
+ processor_1 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
+ processor_2 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
+ processor_3 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
+
+ processor_1.start()
+
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:6 STANDBY_TASKS:0", processor_1.STDOUT_FILE)
+
+ processor_2.start()
+
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_1.STDOUT_FILE)
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_2.STDOUT_FILE)
+
+ processor_3.start()
+
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE)
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE)
+ self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE)
+
+ processor_1.stop()
+
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_2.STDOUT_FILE, num_lines=2)
+ self.wait_for_verification(processor_3, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_3.STDOUT_FILE)
+
+ processor_2.stop()
+
+ self.wait_for_verification(processor_3, "ACTIVE_TASKS:6 STANDBY_TASKS:0", processor_3.STDOUT_FILE)
+
+ processor_1.start()
+
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_1.STDOUT_FILE)
+ self.wait_for_verification(processor_3, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_3.STDOUT_FILE, num_lines=2)
+
+ processor_2.start()
+
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE)
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE)
+ self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE, num_lines=2)
+
+ processor_3.stop()
+
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_1.STDOUT_FILE, num_lines=2)
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_2.STDOUT_FILE)
+
+ processor_1.stop()
+
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:6 STANDBY_TASKS:0", processor_2.STDOUT_FILE)
+
+ processor_3.start()
+ processor_1.start()
+
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE)
+ self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE)
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE, num_lines=2)
+
+ self.assert_consume("assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages)
+ self.assert_consume("assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages)
+
+ self.wait_for_verification(driver, "Producer shut down now, sent total {0} of requested {0}".format(str(self.num_messages)),
+ driver.STDOUT_FILE)
+
+
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.