You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/05 19:06:40 UTC

[kafka] branch trunk updated: MINOR: Add System test for standby task-rebalancing (#4554)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a7d7e7  MINOR: Add System test for standby task-rebalancing (#4554)
8a7d7e7 is described below

commit 8a7d7e7955889125b1196caeded6fa57d93a0b46
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Mon Mar 5 14:06:32 2018 -0500

    MINOR: Add System test for standby task-rebalancing (#4554)
    
    Author: Bill Bejeck <bi...@confluent.io>
    
    Reviewers: Damian Guy <da...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../tests/StreamsRepeatingIntegerKeyProducer.java  | 108 ++++++++++++++
 .../streams/tests/StreamsStandByReplicaTest.java   | 162 ++++++++++++++++++++
 .../apache/kafka/streams/tests/SystemTestUtil.java |  62 ++++++++
 .../kafka/streams/tests/SystemTestUtilTest.java    |  75 ++++++++++
 tests/kafkatest/services/streams.py                |  17 ++-
 .../tests/streams/streams_standby_replica_test.py  | 166 +++++++++++++++++++++
 6 files changed, 589 insertions(+), 1 deletion(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
new file mode 100644
index 0000000..85ca077
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class used to send messages with integer keys
+ * repeating in sequence every 1000 messages.  Multiple topics for publishing
+ * can be provided in the config map with key of 'topics' and ';' delimited list of output topics
+ */
+public class StreamsRepeatingIntegerKeyProducer {
+
+    private static volatile boolean keepProducing = true;
+    private volatile static int messageCounter = 0;
+
+    public static void main(String[] args) {
+        System.out.println("StreamsTest instance started");
+
+        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
+        final String configString = args.length > 2 ? args[2] : null;
+
+        final Map<String, String> configs = SystemTestUtil.parseConfigs(configString);
+        System.out.println("Using provided configs " + configs);
+
+        final int numMessages = configs.containsKey("num_messages") ? Integer.parseInt(configs.get("num_messages")) : 1000;
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "StreamsRepeatingIntegerKeyProducer");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+
+        final String value = "testingValue";
+        Integer key = 0;
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                keepProducing = false;
+            }
+        }));
+
+        final String[] topics = configs.get("topics").split(";");
+        final int totalMessagesToProduce = numMessages * topics.length;
+
+        try (final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(producerProps)) {
+
+            while (keepProducing && messageCounter < totalMessagesToProduce) {
+                for (final String topic : topics) {
+                    final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key.toString(), value + key);
+                    kafkaProducer.send(producerRecord, new Callback() {
+                        @Override
+                        public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+                            if (exception != null) {
+                                exception.printStackTrace(System.err);
+                                System.err.flush();
+                                if (exception instanceof TimeoutException) {
+                                    try {
+                                        // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
+                                        final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
+                                        messageCounter -= expired;
+                                    } catch (final Exception ignore) {
+                                    }
+                                }
+                            }
+                        }
+                    });
+                    messageCounter += 1;
+                }
+                key += 1;
+                if (key % 1000 == 0) {
+                    System.out.println("Sent 1000 messages");
+                    Utils.sleep(100);
+                    key = 0;
+                }
+            }
+        }
+        System.out.println("Producer shut down now, sent total " + messageCounter  + " of requested " + totalMessagesToProduce);
+        System.out.flush();
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
new file mode 100644
index 0000000..0e3aa13
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StreamsStandByReplicaTest {
+
+    public static void main(String[] args) {
+
+        System.out.println("StreamsTest instance started");
+
+        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
+        final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
+        final String additionalConfigs = args.length > 2 ? args[2] : null;
+
+        final Serde<String> stringSerde = Serdes.String();
+
+        final Properties streamsProperties = new Properties();
+        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
+        streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, stateDirStr);
+        streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
+
+        final Map<String, String> updated = SystemTestUtil.parseConfigs(additionalConfigs);
+        System.out.println("Updating configs with " + updated);
+
+        final String sourceTopic = updated.remove("sourceTopic");
+        final String sinkTopic1 = updated.remove("sinkTopic1");
+        final String sinkTopic2 = updated.remove("sinkTopic2");
+
+        if (sourceTopic == null || sinkTopic1 == null || sinkTopic2 == null) {
+            System.err.println(String.format("one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]", sourceTopic, sinkTopic1, sinkTopic2));
+            System.err.flush();
+            System.exit(1);
+        }
+
+        streamsProperties.putAll(updated);
+
+        if (!confirmCorrectConfigs(streamsProperties)) {
+            System.err.println(String.format("ERROR: Did not have all required configs expected  to contain %s, %s,  %s,  %s",
+                                             StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
+                                             StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG),
+                                             StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                                             StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)));
+
+            System.exit(1);
+        }
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        String inMemoryStoreName = "in-memory-store";
+        String persistentMemoryStoreName = "persistent-memory-store";
+
+        KeyValueBytesStoreSupplier inMemoryStoreSupplier = Stores.inMemoryKeyValueStore(inMemoryStoreName);
+        KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentKeyValueStore(persistentMemoryStoreName);
+
+        KStream<String, String> inputStream = builder.stream(sourceTopic, Consumed.with(stringSerde, stringSerde));
+
+        ValueMapper<Long, String> countMapper = new ValueMapper<Long, String>() {
+            @Override
+            public String apply(final Long value) {
+                return value.toString();
+            }
+        };
+
+        inputStream.groupByKey().count(Materialized.<String, Long>as(inMemoryStoreSupplier)).toStream().mapValues(countMapper)
+            .to(sinkTopic1, Produced.with(stringSerde, stringSerde));
+
+        inputStream.groupByKey().count(Materialized.<String, Long>as(persistentStoreSupplier)).toStream().mapValues(countMapper)
+            .to(sinkTopic2, Produced.with(stringSerde, stringSerde));
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
+
+        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(final Thread t, final Throwable e) {
+                System.err.println("FATAL: An unexpected exception " + e);
+                e.printStackTrace(System.err);
+                System.err.flush();
+                shutdown(streams);
+            }
+        });
+
+        streams.setStateListener(new KafkaStreams.StateListener() {
+            @Override
+            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
+                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+                    final Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
+                    for (final ThreadMetadata threadMetadatum : threadMetadata) {
+                        System.out.println("ACTIVE_TASKS:" + threadMetadatum.activeTasks().size() + " STANDBY_TASKS:" + threadMetadatum.standbyTasks().size());
+                    }
+                }
+            }
+        });
+
+        System.out.println("Start Kafka Streams");
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                shutdown(streams);
+                System.out.println("Shut down streams now");
+            }
+        }));
+
+
+    }
+
+    private static void shutdown(final KafkaStreams streams) {
+        streams.close(10, TimeUnit.SECONDS);
+    }
+
+    private static boolean confirmCorrectConfigs(final Properties properties) {
+        return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) &&
+               properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) &&
+               properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG)) &&
+               properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG));
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtil.java
new file mode 100644
index 0000000..4ddbf69
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Class for common convenience methods for working on
+ * System tests
+ */
+
+public class SystemTestUtil {
+
+    private static final int KEY = 0;
+    private static final int VALUE = 1;
+
+    /**
+     * Takes a string with keys and values separated by '=' and each key value pair
+     * separated by ',' for example max.block.ms=5000,retries=6,request.timeout.ms=6000
+     *
+     * This class makes it easier to pass configs from the system test in python to the Java test.
+     *
+     * @param formattedConfigs the formatted config string
+     * @return HashMap with keys and values inserted
+     */
+    public static Map<String, String> parseConfigs(final String formattedConfigs) {
+        Objects.requireNonNull(formattedConfigs, "Formatted config String can't be null");
+
+        if (formattedConfigs.indexOf('=') == -1) {
+            throw new IllegalStateException(String.format("Provided string [ %s ] does not have expected key-value separator of '='", formattedConfigs));
+        }
+
+        final String[] parts = formattedConfigs.split(",");
+        final Map<String, String> configs = new HashMap<>();
+        for (final String part : parts) {
+            final String[] keyValue = part.split("=");
+            if (keyValue.length > 2) {
+                throw new IllegalStateException(
+                    String.format("Provided string [ %s ] does not have expected key-value pair separator of ','", formattedConfigs));
+            }
+            configs.put(keyValue[KEY], keyValue[VALUE]);
+        }
+        return configs;
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
new file mode 100644
index 0000000..f55450d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class SystemTestUtilTest {
+
+    private final Map<String, String> expectedParsedMap = new TreeMap<>();
+
+    @Before
+    public void setUp() {
+        expectedParsedMap.put("foo", "foo1");
+        expectedParsedMap.put("bar", "bar1");
+        expectedParsedMap.put("baz", "baz1");
+    }
+
+    @Test
+    public void shouldParseCorrectMap() {
+        final String formattedConfigs = "foo=foo1,bar=bar1,baz=baz1";
+        final Map<String, String> parsedMap = SystemTestUtil.parseConfigs(formattedConfigs);
+        final TreeMap<String, String> sortedParsedMap = new TreeMap<>(parsedMap);
+        assertEquals(sortedParsedMap, expectedParsedMap);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowExceptionOnNull() {
+        SystemTestUtil.parseConfigs(null);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowExceptionIfNotCorrectKeyValueSeparator() {
+        final String badString = "foo:bar,baz:boo";
+        SystemTestUtil.parseConfigs(badString);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowExceptionIfNotCorrectKeyValuePairSeparator() {
+        final String badString = "foo=bar;baz=boo";
+        SystemTestUtil.parseConfigs(badString);
+    }
+
+    @Test
+    public void shouldParseSingleKeyValuePairString() {
+        final Map<String, String> expectedSinglePairMap = new HashMap<>();
+        expectedSinglePairMap.put("foo", "bar");
+        final String singleValueString = "foo=bar";
+        final Map<String, String> parsedMap = SystemTestUtil.parseConfigs(singleValueString);
+        assertEquals(expectedSinglePairMap, parsedMap);
+    }
+
+
+}
\ No newline at end of file
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 9df87ca..e552a39 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -18,7 +18,6 @@ import signal
 
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
-
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
 
@@ -236,3 +235,19 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService):
               " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
 
         return cmd
+
+
+class StreamsStandbyTaskService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka, configs):
+        super(StreamsStandbyTaskService, self).__init__(test_context,
+                                                        kafka,
+                                                        "org.apache.kafka.streams.tests.StreamsStandByReplicaTest",
+                                                        configs)
+
+
+class StreamsRepeatingIntegerKeyProducerService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka, configs):
+        super(StreamsRepeatingIntegerKeyProducerService, self).__init__(test_context,
+                                                                        kafka,
+                                                                        "org.apache.kafka.streams.tests.StreamsRepeatingIntegerKeyProducer",
+                                                                        configs)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
new file mode 100644
index 0000000..533d4b5
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -0,0 +1,166 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsRepeatingIntegerKeyProducerService
+from kafkatest.services.streams import StreamsStandbyTaskService
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class StreamsStandbyTask(Test):
+    """
+    This test validates using standby tasks helps with rebalance times
+    additionally verifies standby replicas continue to work in the
+    face of continual changes to streams code base
+    """
+
+    streams_source_topic = "standbyTaskSource1"
+    streams_sink_topic_1 = "standbyTaskSink1"
+    streams_sink_topic_2 = "standbyTaskSink2"
+
+    num_messages = 60000
+
+    def __init__(self, test_context):
+        super(StreamsStandbyTask, self).__init__(test_context=test_context)
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context,
+                                  num_nodes=3,
+                                  zk=self.zk,
+                                  topics={
+                                      self.streams_source_topic: {'partitions': 6, 'replication-factor': 1},
+                                      self.streams_sink_topic_1: {'partitions': 1, 'replication-factor': 1},
+                                      self.streams_sink_topic_2: {'partitions': 1, 'replication-factor': 1}
+                                  })
+
+    def get_consumer(self, topic, num_messages):
+        return VerifiableConsumer(self.test_context,
+                                  1,
+                                  self.kafka,
+                                  topic,
+                                  "stream-broker-resilience-verify-consumer",
+                                  max_messages=num_messages)
+
+    def assert_consume(self, test_state, topic, num_messages=5):
+        consumer = self.get_consumer(topic, num_messages)
+        consumer.start()
+
+        wait_until(lambda: consumer.total_consumed() >= num_messages,
+                   timeout_sec=120,
+                   err_msg="At %s streams did not process messages in 60 seconds " % test_state)
+
+    @staticmethod
+    def get_configs(extra_configs=""):
+        # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
+        consumer_poll_ms = "consumer.max.poll.interval.ms=50000"
+        retries_config = "producer.retries=2"
+        request_timeout = "producer.request.timeout.ms=15000"
+        max_block_ms = "producer.max.block.ms=30000"
+
+        # java code expects configs in key=value,key=value format
+        updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms + extra_configs
+
+        return updated_configs
+
+    def wait_for_verification(self, processor, message, file, num_lines=1, timeout_sec=20):
+        wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
+                   timeout_sec=timeout_sec,
+                   err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))
+
+    @staticmethod
+    def verify_from_file(processor, message, file):
+        result = processor.node.account.ssh_output("grep '%s' %s | wc -l" % (message, file), allow_fail=False)
+        return int(result)
+
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    def test_standby_tasks_rebalance(self):
+
+        driver_configs = "num_messages=%s,topics=%s" % (str(self.num_messages), self.streams_source_topic)
+
+        driver = StreamsRepeatingIntegerKeyProducerService(self.test_context, self.kafka, driver_configs)
+        driver.start()
+
+        configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic,
+                                                                                    self.streams_sink_topic_1,
+                                                                                    self.streams_sink_topic_2))
+
+        processor_1 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
+        processor_2 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
+        processor_3 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
+
+        processor_1.start()
+
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:6 STANDBY_TASKS:0", processor_1.STDOUT_FILE)
+
+        processor_2.start()
+
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_2.STDOUT_FILE)
+
+        processor_3.start()
+
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE)
+
+        processor_1.stop()
+
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_2.STDOUT_FILE, num_lines=2)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_3.STDOUT_FILE)
+
+        processor_2.stop()
+
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:6 STANDBY_TASKS:0", processor_3.STDOUT_FILE)
+
+        processor_1.start()
+
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_3.STDOUT_FILE, num_lines=2)
+
+        processor_2.start()
+
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE, num_lines=2)
+
+        processor_3.stop()
+
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_1.STDOUT_FILE, num_lines=2)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_2.STDOUT_FILE)
+
+        processor_1.stop()
+
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:6 STANDBY_TASKS:0", processor_2.STDOUT_FILE)
+
+        processor_3.start()
+        processor_1.start()
+
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE, num_lines=2)
+
+        self.assert_consume("assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages)
+        self.assert_consume("assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages)
+
+        self.wait_for_verification(driver, "Producer shut down now, sent total {0} of requested {0}".format(str(self.num_messages)),
+                                   driver.STDOUT_FILE)
+
+

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.