You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/15 21:42:48 UTC

[kafka] branch trunk updated: MINOR: Streams system tests fixes/updates (#4689)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7006d0f  MINOR: Streams system tests fixes/updates (#4689)
7006d0f is described below

commit 7006d0f58b9a72f181b13bab6f1f64b0e7b4117e
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Mar 15 14:42:43 2018 -0700

    MINOR: Streams system tests fixes/updates (#4689)
    
    Some changes required to get the Streams system tests working via Docker
    
    To test:
    
    TC_PATHS="tests/kafkatest/tests/streams" bash tests/docker/run_tests.sh
    
    That command will take about 3.5 hours, and should pass. Note there are a couple of ignored tests.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bi...@confluent.io>
---
 .gitignore                                         |   4 +
 tests/docker/Dockerfile                            |  23 +++-
 tests/kafkatest/services/streams.py                |   5 +-
 .../tests/streams/streams_upgrade_test.py          | 132 ++++++++++-----------
 tests/kafkatest/version.py                         |   9 +-
 tests/setup.py                                     |   2 +-
 vagrant/base.sh                                    |   8 +-
 7 files changed, 105 insertions(+), 78 deletions(-)

diff --git a/.gitignore b/.gitignore
index 6088349..ba594ff 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,3 +47,7 @@ tests/venv
 docs/generated/
 
 .release-settings.json
+
+kafkatest.egg-info/
+systest/
+
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index 149e391..57ca242 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -40,12 +40,23 @@ RUN ssh-keygen -q -t rsa -N '' -f /root/.ssh/id_rsa && cp -f /root/.ssh/id_rsa.p
 
 # Install binary test dependencies.
 ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
-RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s "$KAFKA_MIRROR/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
-RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
-RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
-RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
-RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
-RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
+RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
+RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
+RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
+RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
+RUN mkdir -p "/opt/kafka-0.10.2.1" && chmod a+rw /opt/kafka-0.10.2.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
+RUN mkdir -p "/opt/kafka-0.11.0.0" && chmod a+rw /opt/kafka-0.11.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
+RUN mkdir -p "/opt/kafka-0.11.0.2" && chmod a+rw /opt/kafka-0.11.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.2"
+RUN mkdir -p "/opt/kafka-1.0.0" && chmod a+rw /opt/kafka-1.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.0"
+RUN mkdir -p "/opt/kafka-1.0.1" && chmod a+rw /opt/kafka-1.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.1"
+
+# Streams test dependencies
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar && \
+    curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.1-test.jar" -o /opt/kafka-0.10.2.1/libs/kafka-streams-0.10.2.1-test.jar && \
+    curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.0-test.jar" -o /opt/kafka-0.11.0.0/libs/kafka-streams-0.11.0.0-test.jar && \
+    curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.2-test.jar" -o /opt/kafka-0.11.0.2/libs/kafka-streams-0.11.0.2-test.jar && \
+    curl -s "$KAFKA_MIRROR/kafka-streams-1.0.0-test.jar" -o /opt/kafka-1.0.0/libs/kafka-streams-1.0.0-test.jar && \
+    curl -s "$KAFKA_MIRROR/kafka-streams-1.0.1-test.jar" -o /opt/kafka-1.0.1/libs/kafka-streams-1.0.1-test.jar
 
 # The version of Kibosh to use for testing.
 # If you update this, also update vagrant/base.sy
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index e552a39..e886c94 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -123,6 +123,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
               " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
               " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+        self.logger.info("Executing: " + cmd)
 
         return cmd
 
@@ -163,7 +164,7 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
 
     def clean_node(self, node):
         if self.clean_node_enabled:
-            super.clean_node(self, node)
+            super(StreamsEosTestBaseService, self).clean_node(node)
 
 
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
@@ -233,7 +234,7 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService):
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
               " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
               " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
-
+        self.logger.info("Executing: " + cmd)
         return cmd
 
 
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 81b7ffe..6ac5939 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -15,13 +15,14 @@
 
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
-from ducktape.mark import parametrize, ignore
+from ducktape.mark import matrix
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
-from kafkatest.version import LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion
+from kafkatest.version import LATEST_0_10_2, LATEST_0_11, LATEST_1_0, DEV_BRANCH, KafkaVersion
 import time
 
+upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11), str(LATEST_1_0), str(DEV_BRANCH)]
 
 class StreamsUpgradeTest(Test):
     """
@@ -55,7 +56,7 @@ class StreamsUpgradeTest(Test):
             'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
                        'configs': {"min.insync.replicas": self.isr} }
         }
-        
+
 
     def perform_streams_upgrade(self, to_version):
         self.logger.info("First pass bounce - rolling streams upgrade")
@@ -76,14 +77,8 @@ class StreamsUpgradeTest(Test):
             node.version = KafkaVersion(to_version)
             self.kafka.start_node(node)
 
-
     @cluster(num_nodes=6)
-    @parametrize(from_version=str(LATEST_0_10_1), to_version=str(DEV_BRANCH))
-    @parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
-    @parametrize(from_version=str(LATEST_0_10_1), to_version=str(LATEST_0_11_0))
-    @parametrize(from_version=str(LATEST_0_10_2), to_version=str(LATEST_0_11_0))
-    @parametrize(from_version=str(LATEST_0_11_0), to_version=str(LATEST_0_10_2))
-    @parametrize(from_version=str(DEV_BRANCH), to_version=str(LATEST_0_10_2))
+    @matrix(from_version=upgrade_versions, to_version=upgrade_versions)
     def test_upgrade_downgrade_streams(self, from_version, to_version):
         """
         Start a smoke test client, then abort (kill -9) and restart it a few times.
@@ -94,74 +89,79 @@ class StreamsUpgradeTest(Test):
         (search for get_kafka()). For streams in particular, that means that someone has manually
         copies the kafka-stream-$version-test.jar in the right S3 bucket as shown in base.sh.
         """
-        # Setup phase
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        # number of nodes needs to be >= 3 for the smoke test
-        self.kafka = KafkaService(self.test_context, num_nodes=3,
-                                  zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
-        self.kafka.start()
-        
-        # allow some time for topics to be created
-        time.sleep(10)
-        
-        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-
-        
-        self.driver.start()
-        self.processor1.start()
-        time.sleep(15)
+        if from_version != to_version:
+            # Setup phase
+            self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            self.zk.start()
 
-        self.perform_streams_upgrade(to_version)
+            # number of nodes needs to be >= 3 for the smoke test
+            self.kafka = KafkaService(self.test_context, num_nodes=3,
+                                      zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
+            self.kafka.start()
 
-        time.sleep(15)
-        self.driver.wait()
-        self.driver.stop()
+            # allow some time for topics to be created
+            time.sleep(10)
 
-        self.processor1.stop()
+            self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+            self.driver.node.version = KafkaVersion(from_version)
+            self.driver.start()
+
+            self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+            self.processor1.node.version = KafkaVersion(from_version)
+            self.processor1.start()
+
+            time.sleep(15)
+
+            self.perform_streams_upgrade(to_version)
+
+            time.sleep(15)
+            self.driver.wait()
+            self.driver.stop()
 
-        node = self.driver.node
-        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
-        self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+            self.processor1.stop()
+
+            self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
+            self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
 
 
 
     @cluster(num_nodes=6)
-    @parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
+    @matrix(from_version=upgrade_versions, to_version=upgrade_versions)
     def test_upgrade_brokers(self, from_version, to_version):
         """
-        Start a smoke test client then perform rolling upgrades on the broker. 
+        Start a smoke test client then perform rolling upgrades on the broker.
         """
-        # Setup phase
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        # number of nodes needs to be >= 3 for the smoke test
-        self.kafka = KafkaService(self.test_context, num_nodes=3,
-                                  zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
-        self.kafka.start()
-        
-        # allow some time for topics to be created
-        time.sleep(10)
-        
-        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-
-        
-        self.driver.start()
-        self.processor1.start()
-        time.sleep(15)
+        if from_version != to_version:
+            # Setup phase
+            self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            self.zk.start()
 
-        self.perform_broker_upgrade(to_version)
+            # number of nodes needs to be >= 3 for the smoke test
+            self.kafka = KafkaService(self.test_context, num_nodes=3,
+                                      zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
+            self.kafka.start()
 
-        time.sleep(15)
-        self.driver.wait()
-        self.driver.stop()
+            # allow some time for topics to be created
+            time.sleep(10)
 
-        self.processor1.stop()
+            # use the current (dev) version driver
+            self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+            self.driver.node.version = KafkaVersion(from_version)
+            self.driver.start()
+
+            self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+            self.processor1.node.version = KafkaVersion(from_version)
+            self.processor1.start()
+
+            time.sleep(15)
+
+            self.perform_broker_upgrade(to_version)
+
+            time.sleep(15)
+            self.driver.wait()
+            self.driver.stop()
+
+            self.processor1.stop()
 
-        node = self.driver.node
-        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
-        self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+            self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
+            self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index f63a7c1..f88fd31 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -91,5 +91,12 @@ LATEST_0_10 = LATEST_0_10_2
 
 # 0.11.0.0 versions
 V_0_11_0_0 = KafkaVersion("0.11.0.0")
-LATEST_0_11_0 = V_0_11_0_0
+V_0_11_0_1 = KafkaVersion("0.11.0.1")
+V_0_11_0_2 = KafkaVersion("0.11.0.2")
+LATEST_0_11_0 = V_0_11_0_2
 LATEST_0_11 = LATEST_0_11_0
+
+# 1.0.x versions
+V_1_0_0 = KafkaVersion("1.0.0")
+V_1_0_1 = KafkaVersion("1.0.1")
+LATEST_1_0 = V_1_0_1
\ No newline at end of file
diff --git a/tests/setup.py b/tests/setup.py
index 24ee4eb..7d7c4a4 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -38,7 +38,7 @@ class PyTest(TestCommand):
     def run_tests(self):
         # import here, cause outside the eggs aren't loaded
         import pytest
-        print self.pytest_args
+        print(self.pytest_args)
         errno = pytest.main(self.pytest_args)
         sys.exit(errno)
 
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 4b55406..bfc3496 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -110,8 +110,12 @@ get_kafka 0.10.1.1 2.11
 chmod a+rw /opt/kafka-0.10.1.1
 get_kafka 0.10.2.1 2.11
 chmod a+rw /opt/kafka-0.10.2.1
-get_kafka 0.11.0.0 2.11
-chmod a+rw /opt/kafka-0.11.0.0
+get_kafka 0.11.0.2 2.11
+chmod a+rw /opt/kafka-0.11.0.2
+get_kafka 1.0.0 2.11
+chmod a+rw /opt/kafka-1.0.0
+get_kafka 1.0.1 2.11
+chmod a+rw /opt/kafka-1.0.1
 
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.