You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/15 21:42:48 UTC
[kafka] branch trunk updated: MINOR: Streams system tests
fixes/updates (#4689)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7006d0f MINOR: Streams system tests fixes/updates (#4689)
7006d0f is described below
commit 7006d0f58b9a72f181b13bab6f1f64b0e7b4117e
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Mar 15 14:42:43 2018 -0700
MINOR: Streams system tests fixes/updates (#4689)
Some changes required to get the Streams system tests working via Docker
To test:
TC_PATHS="tests/kafkatest/tests/streams" bash tests/docker/run_tests.sh
That command will take about 3.5 hours, and should pass. Note there are a couple of ignored tests.
Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bi...@confluent.io>
---
.gitignore | 4 +
tests/docker/Dockerfile | 23 +++-
tests/kafkatest/services/streams.py | 5 +-
.../tests/streams/streams_upgrade_test.py | 132 ++++++++++-----------
tests/kafkatest/version.py | 9 +-
tests/setup.py | 2 +-
vagrant/base.sh | 8 +-
7 files changed, 105 insertions(+), 78 deletions(-)
diff --git a/.gitignore b/.gitignore
index 6088349..ba594ff 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,3 +47,7 @@ tests/venv
docs/generated/
.release-settings.json
+
+kafkatest.egg-info/
+systest/
+
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index 149e391..57ca242 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -40,12 +40,23 @@ RUN ssh-keygen -q -t rsa -N '' -f /root/.ssh/id_rsa && cp -f /root/.ssh/id_rsa.p
# Install binary test dependencies.
ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
-RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s "$KAFKA_MIRROR/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
-RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
-RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
-RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
-RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
-RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
+RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
+RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
+RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
+RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
+RUN mkdir -p "/opt/kafka-0.10.2.1" && chmod a+rw /opt/kafka-0.10.2.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
+RUN mkdir -p "/opt/kafka-0.11.0.0" && chmod a+rw /opt/kafka-0.11.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
+RUN mkdir -p "/opt/kafka-0.11.0.2" && chmod a+rw /opt/kafka-0.11.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.2"
+RUN mkdir -p "/opt/kafka-1.0.0" && chmod a+rw /opt/kafka-1.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.0"
+RUN mkdir -p "/opt/kafka-1.0.1" && chmod a+rw /opt/kafka-1.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.1"
+
+# Streams test dependencies
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar && \
+ curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.1-test.jar" -o /opt/kafka-0.10.2.1/libs/kafka-streams-0.10.2.1-test.jar && \
+ curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.0-test.jar" -o /opt/kafka-0.11.0.0/libs/kafka-streams-0.11.0.0-test.jar && \
+ curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.2-test.jar" -o /opt/kafka-0.11.0.2/libs/kafka-streams-0.11.0.2-test.jar && \
+ curl -s "$KAFKA_MIRROR/kafka-streams-1.0.0-test.jar" -o /opt/kafka-1.0.0/libs/kafka-streams-1.0.0-test.jar && \
+ curl -s "$KAFKA_MIRROR/kafka-streams-1.0.1-test.jar" -o /opt/kafka-1.0.1/libs/kafka-streams-1.0.1-test.jar
# The version of Kibosh to use for testing.
# If you update this, also update vagrant/base.sy
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index e552a39..e886c94 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -123,6 +123,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
" %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+ self.logger.info("Executing: " + cmd)
return cmd
@@ -163,7 +164,7 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
def clean_node(self, node):
if self.clean_node_enabled:
- super.clean_node(self, node)
+ super(StreamsEosTestBaseService, self).clean_node(node)
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
@@ -233,7 +234,7 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService):
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
" %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
-
+ self.logger.info("Executing: " + cmd)
return cmd
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 81b7ffe..6ac5939 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -15,13 +15,14 @@
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
-from ducktape.mark import parametrize, ignore
+from ducktape.mark import matrix
from kafkatest.services.kafka import KafkaService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
-from kafkatest.version import LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion
+from kafkatest.version import LATEST_0_10_2, LATEST_0_11, LATEST_1_0, DEV_BRANCH, KafkaVersion
import time
+upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11), str(LATEST_1_0), str(DEV_BRANCH)]
class StreamsUpgradeTest(Test):
"""
@@ -55,7 +56,7 @@ class StreamsUpgradeTest(Test):
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} }
}
-
+
def perform_streams_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling streams upgrade")
@@ -76,14 +77,8 @@ class StreamsUpgradeTest(Test):
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)
-
@cluster(num_nodes=6)
- @parametrize(from_version=str(LATEST_0_10_1), to_version=str(DEV_BRANCH))
- @parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
- @parametrize(from_version=str(LATEST_0_10_1), to_version=str(LATEST_0_11_0))
- @parametrize(from_version=str(LATEST_0_10_2), to_version=str(LATEST_0_11_0))
- @parametrize(from_version=str(LATEST_0_11_0), to_version=str(LATEST_0_10_2))
- @parametrize(from_version=str(DEV_BRANCH), to_version=str(LATEST_0_10_2))
+ @matrix(from_version=upgrade_versions, to_version=upgrade_versions)
def test_upgrade_downgrade_streams(self, from_version, to_version):
"""
Start a smoke test client, then abort (kill -9) and restart it a few times.
@@ -94,74 +89,79 @@ class StreamsUpgradeTest(Test):
(search for get_kafka()). For streams in particular, that means that someone has manually
copies the kafka-stream-$version-test.jar in the right S3 bucket as shown in base.sh.
"""
- # Setup phase
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.zk.start()
-
- # number of nodes needs to be >= 3 for the smoke test
- self.kafka = KafkaService(self.test_context, num_nodes=3,
- zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
- self.kafka.start()
-
- # allow some time for topics to be created
- time.sleep(10)
-
- self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
- self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-
-
- self.driver.start()
- self.processor1.start()
- time.sleep(15)
+ if from_version != to_version:
+ # Setup phase
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+ self.zk.start()
- self.perform_streams_upgrade(to_version)
+ # number of nodes needs to be >= 3 for the smoke test
+ self.kafka = KafkaService(self.test_context, num_nodes=3,
+ zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
+ self.kafka.start()
- time.sleep(15)
- self.driver.wait()
- self.driver.stop()
+ # allow some time for topics to be created
+ time.sleep(10)
- self.processor1.stop()
+ self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+ self.driver.node.version = KafkaVersion(from_version)
+ self.driver.start()
+
+ self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+ self.processor1.node.version = KafkaVersion(from_version)
+ self.processor1.start()
+
+ time.sleep(15)
+
+ self.perform_streams_upgrade(to_version)
+
+ time.sleep(15)
+ self.driver.wait()
+ self.driver.stop()
- node = self.driver.node
- node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
- self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+ self.processor1.stop()
+
+ self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
+ self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=6)
- @parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
+ @matrix(from_version=upgrade_versions, to_version=upgrade_versions)
def test_upgrade_brokers(self, from_version, to_version):
"""
- Start a smoke test client then perform rolling upgrades on the broker.
+ Start a smoke test client then perform rolling upgrades on the broker.
"""
- # Setup phase
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.zk.start()
-
- # number of nodes needs to be >= 3 for the smoke test
- self.kafka = KafkaService(self.test_context, num_nodes=3,
- zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
- self.kafka.start()
-
- # allow some time for topics to be created
- time.sleep(10)
-
- self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
- self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-
-
- self.driver.start()
- self.processor1.start()
- time.sleep(15)
+ if from_version != to_version:
+ # Setup phase
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+ self.zk.start()
- self.perform_broker_upgrade(to_version)
+ # number of nodes needs to be >= 3 for the smoke test
+ self.kafka = KafkaService(self.test_context, num_nodes=3,
+ zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
+ self.kafka.start()
- time.sleep(15)
- self.driver.wait()
- self.driver.stop()
+ # allow some time for topics to be created
+ time.sleep(10)
- self.processor1.stop()
+ # use the current (dev) version driver
+ self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+ self.driver.node.version = KafkaVersion(from_version)
+ self.driver.start()
+
+ self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+ self.processor1.node.version = KafkaVersion(from_version)
+ self.processor1.start()
+
+ time.sleep(15)
+
+ self.perform_broker_upgrade(to_version)
+
+ time.sleep(15)
+ self.driver.wait()
+ self.driver.stop()
+
+ self.processor1.stop()
- node = self.driver.node
- node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
- self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+ self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
+ self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index f63a7c1..f88fd31 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -91,5 +91,12 @@ LATEST_0_10 = LATEST_0_10_2
# 0.11.0.0 versions
V_0_11_0_0 = KafkaVersion("0.11.0.0")
-LATEST_0_11_0 = V_0_11_0_0
+V_0_11_0_1 = KafkaVersion("0.11.0.1")
+V_0_11_0_2 = KafkaVersion("0.11.0.2")
+LATEST_0_11_0 = V_0_11_0_2
LATEST_0_11 = LATEST_0_11_0
+
+# 1.0.x versions
+V_1_0_0 = KafkaVersion("1.0.0")
+V_1_0_1 = KafkaVersion("1.0.1")
+LATEST_1_0 = V_1_0_1
\ No newline at end of file
diff --git a/tests/setup.py b/tests/setup.py
index 24ee4eb..7d7c4a4 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -38,7 +38,7 @@ class PyTest(TestCommand):
def run_tests(self):
# import here, cause outside the eggs aren't loaded
import pytest
- print self.pytest_args
+ print(self.pytest_args)
errno = pytest.main(self.pytest_args)
sys.exit(errno)
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 4b55406..bfc3496 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -110,8 +110,12 @@ get_kafka 0.10.1.1 2.11
chmod a+rw /opt/kafka-0.10.1.1
get_kafka 0.10.2.1 2.11
chmod a+rw /opt/kafka-0.10.2.1
-get_kafka 0.11.0.0 2.11
-chmod a+rw /opt/kafka-0.11.0.0
+get_kafka 0.11.0.2 2.11
+chmod a+rw /opt/kafka-0.11.0.2
+get_kafka 1.0.0 2.11
+chmod a+rw /opt/kafka-1.0.0
+get_kafka 1.0.1 2.11
+chmod a+rw /opt/kafka-1.0.1
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.