You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/06/24 06:48:15 UTC

kafka git commit: KAFKA-5487; upgrade and downgrade streams app system test

Repository: kafka
Updated Branches:
  refs/heads/trunk d5e463b9d -> ee5eac715


KAFKA-5487; upgrade and downgrade streams app system test

-Tests for rolling upgrades for a streams app (keeping broker config fixed)
-Tests for rolling upgrades of brokers (keeping streams app config fixed)

Author: Eno Thereska <en...@gmail.com>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>

Closes #3411 from enothereska/KAFKA-5487-upgrade-test-streams


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee5eac71
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee5eac71
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee5eac71

Branch: refs/heads/trunk
Commit: ee5eac715d58e6b16a115692ede93ae481ae7785
Parents: d5e463b
Author: Eno Thereska <en...@gmail.com>
Authored: Sat Jun 24 07:48:10 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Sat Jun 24 07:48:10 2017 +0100

----------------------------------------------------------------------
 .../tests/streams/streams_upgrade_test.py       | 164 +++++++++++++++++++
 vagrant/base.sh                                 |   5 +
 2 files changed, 169 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ee5eac71/tests/kafkatest/tests/streams/streams_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
new file mode 100644
index 0000000..16a8489
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -0,0 +1,164 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize, ignore
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.version import LATEST_0_10_1, LATEST_0_10_2, DEV_BRANCH, KafkaVersion
+import time
+
+
+class StreamsUpgradeTest(Test):
+    """
+    Tests rolling upgrades and downgrades of the Kafka Streams library.
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context)
+        self.replication = 3
+        self.partitions = 1
+        self.isr = 2
+        self.topics = {
+            'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": self.isr}},
+            'data' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": self.isr} },
+            'min' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": self.isr} },
+            'max' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": self.isr} },
+            'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": self.isr} },
+            'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": self.isr} },
+            'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": self.isr} },
+            'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": self.isr} },
+            'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": self.isr} },
+            'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": self.isr} }
+        }
+        
+
+    def perform_streams_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling streams upgrade")
+
+        # get the node running the streams app
+        node = self.processor1.node
+        self.processor1.stop()
+
+        # change it's version. This will automatically make it pick up a different
+        # JAR when it starts again
+        node.version = KafkaVersion(to_version)
+        self.processor1.start()
+
+    def perform_broker_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling broker upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_version)
+            self.kafka.start_node(node)
+
+
+    @cluster(num_nodes=6)
+    @parametrize(from_version=str(LATEST_0_10_1), to_version=str(DEV_BRANCH))
+    @parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
+    @parametrize(from_version=str(DEV_BRANCH), to_version=str(LATEST_0_10_2))
+    def test_upgrade_downgrade_streams(self, from_version, to_version):
+        """
+        Start a smoke test client, then abort (kill -9) and restart it a few times.
+        Ensure that all records are delivered.
+
+        Note, that just like tests/core/upgrade_test.py, a prerequisite for this test to succeed
+        if the inclusion of all parametrized versions of kafka in kafka/vagrant/base.sh 
+        (search for get_kafka()). For streams in particular, that means that someone has manually
+        copies the kafka-stream-$version-test.jar in the right S3 bucket as shown in base.sh.
+        """
+        # Setup phase
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+
+        # number of nodes needs to be >= 3 for the smoke test
+        self.kafka = KafkaService(self.test_context, num_nodes=3,
+                                  zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
+        self.kafka.start()
+        
+        # allow some time for topics to be created
+        time.sleep(10)
+        
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+
+        
+        self.driver.start()
+        self.processor1.start()
+        time.sleep(15)
+
+        self.perform_streams_upgrade(to_version)
+
+        time.sleep(15)
+        self.driver.wait()
+        self.driver.stop()
+
+        self.processor1.stop()
+
+        node = self.driver.node
+        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
+        self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+
+
+
+    @cluster(num_nodes=6)
+    @parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
+    def test_upgrade_brokers(self, from_version, to_version):
+        """
+        Start a smoke test client then perform rolling upgrades on the broker. 
+        """
+        # Setup phase
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+
+        # number of nodes needs to be >= 3 for the smoke test
+        self.kafka = KafkaService(self.test_context, num_nodes=3,
+                                  zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
+        self.kafka.start()
+        
+        # allow some time for topics to be created
+        time.sleep(10)
+        
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+
+        
+        self.driver.start()
+        self.processor1.start()
+        time.sleep(15)
+
+        self.perform_broker_upgrade(to_version)
+
+        time.sleep(15)
+        self.driver.wait()
+        self.driver.stop()
+
+        self.processor1.stop()
+
+        node = self.driver.node
+        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
+        self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee5eac71/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 4c0add5..7c0b5ed 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -64,15 +64,20 @@ get_kafka() {
 
     kafka_dir=/opt/kafka-$version
     url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz
+    # the .tgz above does not include the streams test jar hence we need to get it separately
+    url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka-streams-$version-test.jar
     if [ ! -d /opt/kafka-$version ]; then
         pushd /tmp
         curl -O $url
+        curl -O $url_streams_test || true
         file_tgz=`basename $url`
+        file_streams_jar=`basename $url_streams_test` || true
         tar -xzf $file_tgz
         rm -rf $file_tgz
 
         file=`basename $file_tgz .tgz`
         mv $file $kafka_dir
+        mv $file_streams_jar $kafka_dir/libs || true
         popd
     fi
 }