You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/12/03 20:37:44 UTC
[kafka] branch trunk updated: MINOR: Adding system test for named
repartition topics (#5913)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ab1fb3f MINOR: Adding system test for named repartition topics (#5913)
ab1fb3f is described below
commit ab1fb3fdde95be535b97eb1c8c21ef32896f891a
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Mon Dec 3 15:37:31 2018 -0500
MINOR: Adding system test for named repartition topics (#5913)
This is a system test for doing a rolling upgrade of a topology with a named repartition topic.
1. An initial Kafka Streams application is started on 3 nodes. The topology has one operation forcing a repartition and the repartition topic is explicitly named.
2. Each node is started and processing of data is validated
3. Then one node is stopped (full stop is verified)
4. A property is set signaling the node to add operations to the topology before the repartition node which forces a renumbering of all operators (except repartition node)
5. Restart the node and confirm processing records
6. Repeat the steps for the other 2 nodes completing the rolling upgrade
I ran two runs of the system test with 25 repeats in each run for a total of 50 test runs.
All test runs passed
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../streams/tests/StreamsNamedRepartitionTest.java | 120 +++++++++++++++++++++
tests/kafkatest/services/streams.py | 22 ++++
.../streams_named_repartition_topic_test.py | 119 ++++++++++++++++++++
3 files changed, 261 insertions(+)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
new file mode 100644
index 0000000..660de39
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+
+public class StreamsNamedRepartitionTest {
+
+ public static void main(final String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err.println("StreamsNamedRepartitionTest requires one argument (properties-file) but none provided: ");
+ }
+ final String propFileName = args[0];
+
+ final Properties streamsProperties = Utils.loadProps(propFileName);
+
+ System.out.println("StreamsTest instance started NAMED_REPARTITION_TEST");
+ System.out.println("props=" + streamsProperties);
+
+ final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic")));
+ final String aggregationTopic = (String) (Objects.requireNonNull(streamsProperties.remove("aggregation.topic")));
+ final boolean addOperators = Boolean.valueOf(Objects.requireNonNull((String) streamsProperties.remove("add.operations")));
+
+
+ final Initializer<Integer> initializer = () -> 0;
+ final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + Integer.parseInt(v);
+
+ final Function<String, String> keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+
+ final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v));
+
+ final KStream<String, String> maybeUpdatedStream;
+
+ if (addOperators) {
+ maybeUpdatedStream = mappedStream.filter((k, v) -> true).mapValues(v -> Integer.toString(Integer.parseInt(v) + 1));
+ } else {
+ maybeUpdatedStream = mappedStream;
+ }
+
+ maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String()))
+ .aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer()))
+ .toStream()
+ .peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v)))
+ .to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer()));
+
+ final Properties config = new Properties();
+
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest");
+ config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
+ config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+
+ config.putAll(streamsProperties);
+
+ final Topology topology = builder.build(config);
+ final KafkaStreams streams = new KafkaStreams(topology, config);
+
+
+ streams.setStateListener((oldState, newState) -> {
+ if (oldState == State.REBALANCING && newState == State.RUNNING) {
+ if (addOperators) {
+ System.out.println("UPDATED Topology");
+ } else {
+ System.out.println("REBALANCING -> RUNNING");
+ }
+ System.out.flush();
+ }
+ });
+
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close(Duration.ofMillis(5000));
+ System.out.println("NAMED_REPARTITION_TEST Streams Stopped");
+ System.out.flush();
+ }));
+
+ }
+
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index fb43dfb..29a235e 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -506,3 +506,25 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
self.logger.info("Executing: " + cmd)
return cmd
+
+
+class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
+ def __init__(self, test_context, kafka):
+ super(StreamsNamedRepartitionTopicService, self).__init__(test_context,
+ kafka,
+ "org.apache.kafka.streams.tests.StreamsNamedRepartitionTest",
+ "")
+ self.ADD_ADDITIONAL_OPS = 'false'
+ self.INPUT_TOPIC = None
+ self.AGGREGATION_TOPIC = None
+
+ def prop_file(self):
+ properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+ streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
+
+ properties['input.topic'] = self.INPUT_TOPIC
+ properties['aggregation.topic'] = self.AGGREGATION_TOPIC
+ properties['add.operations'] = self.ADD_ADDITIONAL_OPS
+
+ cfg = KafkaConfig(**properties)
+ return cfg.render()
diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
new file mode 100644
index 0000000..5baf612
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsNamedRepartitionTopicService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class StreamsNamedRepartitionTopicTest(Test):
+ """
+ Tests using a named repartition topic by starting
+ application then doing a rolling upgrade with added
+ operations and the application still runs
+ """
+
+ input_topic = 'inputTopic'
+ aggregation_topic = 'aggregationTopic'
+ pattern = 'AGGREGATED'
+
+ def __init__(self, test_context):
+ super(StreamsNamedRepartitionTopicTest, self).__init__(test_context)
+ self.topics = {
+ self.input_topic: {'partitions': 6},
+ self.aggregation_topic: {'partitions': 6}
+ }
+
+ self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+ self.kafka = KafkaService(self.test_context, num_nodes=1,
+ zk=self.zookeeper, topics=self.topics)
+
+ self.producer = VerifiableProducer(self.test_context,
+ 1,
+ self.kafka,
+ self.input_topic,
+ throughput=1000,
+ acks=1)
+
+ def test_upgrade_topology_with_named_repartition_topic(self):
+ self.zookeeper.start()
+ self.kafka.start()
+
+ processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
+ processor2 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
+ processor3 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
+
+ processors = [processor1, processor2, processor3]
+
+ self.producer.start()
+
+ for processor in processors:
+ processor.CLEAN_NODE_ENABLED = False
+ self.set_topics(processor)
+ processor.start()
+ self.verify_running(processor, 'REBALANCING -> RUNNING')
+
+ self.verify_processing(processors)
+
+ # do rolling upgrade
+ for processor in processors:
+ self.verify_stopped(processor)
+ # will tell app to add operations before repartition topic
+ processor.ADD_ADDITIONAL_OPS = 'true'
+ processor.start()
+ self.verify_running(processor, 'UPDATED Topology')
+
+ self.verify_processing(processors)
+
+ self.stop_processors(processors)
+
+ self.producer.stop()
+ self.kafka.stop()
+ self.zookeeper.stop()
+
+ @staticmethod
+ def verify_running(processor, message):
+ with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+ monitor.wait_until(message,
+ timeout_sec=60,
+ err_msg="Never saw '%s' message " % message + str(processor.node.account))
+
+ @staticmethod
+ def verify_stopped(processor):
+ node = processor.node
+ with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+ processor.stop()
+ monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped',
+ timeout_sec=60,
+ err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account))
+
+ def verify_processing(self, processors):
+ for processor in processors:
+ with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+ monitor.wait_until(self.pattern,
+ timeout_sec=60,
+ err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account))
+
+ def stop_processors(self, processors):
+ for processor in processors:
+ self.verify_stopped(processor)
+
+ def set_topics(self, processor):
+ processor.INPUT_TOPIC = self.input_topic
+ processor.AGGREGATION_TOPIC = self.aggregation_topic