You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/12/03 20:37:44 UTC

[kafka] branch trunk updated: MINOR: Adding system test for named repartition topics (#5913)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab1fb3f  MINOR: Adding system test for named repartition topics (#5913)
ab1fb3f is described below

commit ab1fb3fdde95be535b97eb1c8c21ef32896f891a
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Mon Dec 3 15:37:31 2018 -0500

    MINOR: Adding system test for named repartition topics (#5913)
    
    This is a system test for doing a rolling upgrade of a topology with a named repartition topic.
    
    1. An initial Kafka Streams application is started on 3 nodes. The topology has one operation forcing a repartition and the repartition topic is explicitly named.
    2. Each node is started and processing of data is validated
    3. Then one node is stopped (full stop is verified)
    4. A property is set signaling the node to add operations to the topology before the repartition node which forces a renumbering of all operators (except repartition node)
    5. Restart the node and confirm processing records
    6. Repeat the steps for the other 2 nodes completing the rolling upgrade
    
    I ran two runs of the system test with 25 repeats in each run for a total of 50 test runs.
    All test runs passed
    
    Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../streams/tests/StreamsNamedRepartitionTest.java | 120 +++++++++++++++++++++
 tests/kafkatest/services/streams.py                |  22 ++++
 .../streams_named_repartition_topic_test.py        | 119 ++++++++++++++++++++
 3 files changed, 261 insertions(+)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
new file mode 100644
index 0000000..660de39
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+
+public class StreamsNamedRepartitionTest {
+
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 1) {
+            System.err.println("StreamsNamedRepartitionTest requires one argument (properties-file) but none provided: ");
+        }
+        final String propFileName = args[0];
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started NAMED_REPARTITION_TEST");
+        System.out.println("props=" + streamsProperties);
+
+        final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic")));
+        final String aggregationTopic = (String) (Objects.requireNonNull(streamsProperties.remove("aggregation.topic")));
+        final boolean addOperators = Boolean.valueOf(Objects.requireNonNull((String) streamsProperties.remove("add.operations")));
+
+
+        final Initializer<Integer> initializer = () -> 0;
+        final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + Integer.parseInt(v);
+
+        final Function<String, String> keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v));
+
+        final KStream<String, String> maybeUpdatedStream;
+
+        if (addOperators) {
+            maybeUpdatedStream = mappedStream.filter((k, v) -> true).mapValues(v -> Integer.toString(Integer.parseInt(v) + 1));
+        } else {
+            maybeUpdatedStream = mappedStream;
+        }
+
+        maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String()))
+            .aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer()))
+            .toStream()
+            .peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v)))
+            .to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer()));
+
+        final Properties config = new Properties();
+
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest");
+        config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+
+        config.putAll(streamsProperties);
+
+        final Topology topology = builder.build(config);
+        final KafkaStreams streams = new KafkaStreams(topology, config);
+
+
+        streams.setStateListener((oldState, newState) -> {
+            if (oldState == State.REBALANCING && newState == State.RUNNING) {
+                if (addOperators) {
+                    System.out.println("UPDATED Topology");
+                } else {
+                    System.out.println("REBALANCING -> RUNNING");
+                }
+                System.out.flush();
+            }
+        });
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            System.out.println("closing Kafka Streams instance");
+            System.out.flush();
+            streams.close(Duration.ofMillis(5000));
+            System.out.println("NAMED_REPARTITION_TEST Streams Stopped");
+            System.out.flush();
+        }));
+
+    }
+
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index fb43dfb..29a235e 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -506,3 +506,25 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
         self.logger.info("Executing: " + cmd)
 
         return cmd
+
+
+class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsNamedRepartitionTopicService, self).__init__(test_context,
+                                                                  kafka,
+                                                                  "org.apache.kafka.streams.tests.StreamsNamedRepartitionTest",
+                                                                  "")
+        self.ADD_ADDITIONAL_OPS = 'false'
+        self.INPUT_TOPIC = None
+        self.AGGREGATION_TOPIC = None
+
+    def prop_file(self):
+        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+                      streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
+
+        properties['input.topic'] = self.INPUT_TOPIC
+        properties['aggregation.topic'] = self.AGGREGATION_TOPIC
+        properties['add.operations'] = self.ADD_ADDITIONAL_OPS
+
+        cfg = KafkaConfig(**properties)
+        return cfg.render()
diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
new file mode 100644
index 0000000..5baf612
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsNamedRepartitionTopicService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class StreamsNamedRepartitionTopicTest(Test):
+    """
+    Tests using a named repartition topic by starting
+    application then doing a rolling upgrade with added
+    operations and the application still runs
+    """
+
+    input_topic = 'inputTopic'
+    aggregation_topic = 'aggregationTopic'
+    pattern = 'AGGREGATED'
+
+    def __init__(self, test_context):
+        super(StreamsNamedRepartitionTopicTest, self).__init__(test_context)
+        self.topics = {
+            self.input_topic: {'partitions': 6},
+            self.aggregation_topic: {'partitions': 6}
+        }
+
+        self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=1,
+                                  zk=self.zookeeper, topics=self.topics)
+
+        self.producer = VerifiableProducer(self.test_context,
+                                           1,
+                                           self.kafka,
+                                           self.input_topic,
+                                           throughput=1000,
+                                           acks=1)
+
+    def test_upgrade_topology_with_named_repartition_topic(self):
+        self.zookeeper.start()
+        self.kafka.start()
+
+        processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
+        processor2 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
+        processor3 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
+
+        processors = [processor1, processor2, processor3]
+
+        self.producer.start()
+
+        for processor in processors:
+            processor.CLEAN_NODE_ENABLED = False
+            self.set_topics(processor)
+            processor.start()
+            self.verify_running(processor, 'REBALANCING -> RUNNING')
+
+        self.verify_processing(processors)
+
+        # do rolling upgrade
+        for processor in processors:
+            self.verify_stopped(processor)
+            #  will tell app to add operations before repartition topic
+            processor.ADD_ADDITIONAL_OPS = 'true'
+            processor.start()
+            self.verify_running(processor, 'UPDATED Topology')
+
+        self.verify_processing(processors)
+
+        self.stop_processors(processors)
+
+        self.producer.stop()
+        self.kafka.stop()
+        self.zookeeper.stop()
+
+    @staticmethod
+    def verify_running(processor, message):
+        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            monitor.wait_until(message,
+                               timeout_sec=60,
+                               err_msg="Never saw '%s' message " % message + str(processor.node.account))
+
+    @staticmethod
+    def verify_stopped(processor):
+        node = processor.node
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            processor.stop()
+            monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped',
+                               timeout_sec=60,
+                               err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account))
+
+    def verify_processing(self, processors):
+        for processor in processors:
+            with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+                monitor.wait_until(self.pattern,
+                                   timeout_sec=60,
+                                   err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account))
+
+    def stop_processors(self, processors):
+        for processor in processors:
+            self.verify_stopped(processor)
+
+    def set_topics(self, processor):
+        processor.INPUT_TOPIC = self.input_topic
+        processor.AGGREGATION_TOPIC = self.aggregation_topic