You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/12 02:43:27 UTC
[2/2] kafka git commit: KAFKA-4140: Upgrade to ducktape 0.6.0 and
make system tests parallel friendly
KAFKA-4140: Upgrade to ducktape 0.6.0 and make system tests parallel friendly
Updates to take advantage of soon-to-be-released ducktape features.
Author: Geoff Anderson <ge...@confluent.io>
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1834 from granders/systest-parallel-friendly
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62e043a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62e043a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62e043a8
Branch: refs/heads/trunk
Commit: 62e043a86565cc9bc485658d6c6d176e9aff620f
Parents: 6f7ed15
Author: Geoff Anderson <ge...@confluent.io>
Authored: Sun Dec 11 18:43:23 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sun Dec 11 18:43:23 2016 -0800
----------------------------------------------------------------------
.../kafkatest/benchmarks/core/benchmark_test.py | 13 ++-
.../streams/streams_simple_benchmark_test.py | 5 +-
.../sanity_checks/test_console_consumer.py | 12 ++-
.../sanity_checks/test_kafka_version.py | 3 +
.../sanity_checks/test_performance_services.py | 2 +
.../sanity_checks/test_verifiable_producer.py | 2 +
tests/kafkatest/services/connect.py | 3 +-
tests/kafkatest/services/console_consumer.py | 5 +-
tests/kafkatest/services/kafka/kafka.py | 10 +-
.../kafkatest/services/kafka_log4j_appender.py | 3 +-
tests/kafkatest/services/mirror_maker.py | 4 +-
tests/kafkatest/services/monitor/jmx.py | 44 ++++++--
.../performance/producer_performance.py | 12 ++-
.../services/performance/streams_performance.py | 2 +-
.../services/replica_verification_tool.py | 3 +-
tests/kafkatest/services/security/minikdc.py | 43 +++++++-
.../services/security/security_config.py | 49 +++++----
tests/kafkatest/services/verifiable_consumer.py | 4 +-
tests/kafkatest/services/verifiable_producer.py | 102 ++++++++++++-------
tests/kafkatest/services/zookeeper.py | 8 +-
.../kafkatest/tests/client/compression_test.py | 3 +
.../client/consumer_rolling_upgrade_test.py | 4 +-
tests/kafkatest/tests/client/consumer_test.py | 11 +-
.../tests/client/message_format_change_test.py | 4 +-
tests/kafkatest/tests/client/quota_test.py | 2 +
.../tests/connect/connect_distributed_test.py | 30 ++++--
.../tests/connect/connect_rest_test.py | 9 +-
tests/kafkatest/tests/connect/connect_test.py | 19 ++--
.../core/compatibility_test_new_broker_test.py | 4 +-
.../tests/core/consumer_group_command_test.py | 4 +
.../tests/core/get_offset_shell_test.py | 6 +-
tests/kafkatest/tests/core/mirror_maker_test.py | 13 ++-
.../tests/core/reassign_partitions_test.py | 3 +
tests/kafkatest/tests/core/replication_test.py | 5 +-
.../tests/core/security_rolling_upgrade_test.py | 12 ++-
tests/kafkatest/tests/core/security_test.py | 55 ++++++----
.../tests/core/simple_consumer_shell_test.py | 4 +
tests/kafkatest/tests/core/throttling_test.py | 2 +
tests/kafkatest/tests/core/upgrade_test.py | 5 +
.../core/zookeeper_security_upgrade_test.py | 6 +-
.../tests/streams/streams_bounce_test.py | 8 +-
.../tests/streams/streams_smoke_test.py | 11 +-
.../tests/tools/log4j_appender_test.py | 8 +-
.../tests/tools/replica_verification_test.py | 8 +-
tests/setup.py | 2 +-
45 files changed, 400 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/benchmarks/core/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 4dbf902..14fab2f 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -15,6 +15,7 @@
from ducktape.mark import matrix
from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
from ducktape.services.service import Service
from ducktape.tests.test import Test
@@ -63,11 +64,13 @@ class Benchmark(Test):
self.kafka.log_level = "INFO" # We don't DEBUG logging here
self.kafka.start()
+ @cluster(num_nodes=5)
@parametrize(acks=1, topic=TOPIC_REP_ONE)
@parametrize(acks=1, topic=TOPIC_REP_THREE)
@parametrize(acks=-1, topic=TOPIC_REP_THREE)
- @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL'])
+ @cluster(num_nodes=7)
+ @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE,
compression_type="none", security_protocol='PLAINTEXT', client_version=str(TRUNK),
broker_version=str(TRUNK)):
@@ -97,6 +100,7 @@ class Benchmark(Test):
self.producer.run()
return compute_aggregate_throughput(self.producer)
+ @cluster(num_nodes=5)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT',
@@ -152,8 +156,11 @@ class Benchmark(Test):
self.logger.info("\n".join(summary))
return data
+ @cluster(num_nodes=5)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
- @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
+ @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
+ @cluster(num_nodes=6)
+ @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT",
interbroker_security_protocol=None, client_version=str(TRUNK),
broker_version=str(TRUNK)):
@@ -181,6 +188,7 @@ class Benchmark(Test):
self.perf.run()
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
+ @cluster(num_nodes=6)
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
@@ -229,6 +237,7 @@ class Benchmark(Test):
self.logger.info("\n".join(summary))
return data
+ @cluster(num_nodes=6)
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index de687e6..ab9b112 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ducktape.mark import ignore
+from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
-import time
+
class StreamsSimpleBenchmarkTest(KafkaTest):
"""
@@ -29,6 +29,7 @@ class StreamsSimpleBenchmarkTest(KafkaTest):
self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L)
+ @cluster(num_nodes=3)
def test_simple_benchmark(self):
"""
Run simple Kafka Streams benchmark
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 18cbfb7..38db057 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -17,6 +17,7 @@ import time
from ducktape.mark import matrix
from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
@@ -42,9 +43,12 @@ class ConsoleConsumerTest(Test):
def setUp(self):
self.zk.start()
+ @cluster(num_nodes=3)
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+ @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+ @cluster(num_nodes=4)
@parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN')
- @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+ @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
def test_lifecycle(self, security_protocol, new_consumer=True, sasl_mechanism='GSSAPI'):
"""Check that console consumer starts/stops properly, and that we are capturing log output."""
@@ -66,14 +70,16 @@ class ConsoleConsumerTest(Test):
# Verify that log output is happening
wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10,
- err_msg="Timed out waiting for logging to start.")
- assert line_count(node, ConsoleConsumer.LOG_FILE) > 0
+ err_msg="Timed out waiting for consumer log file to exist.")
+ wait_until(lambda: line_count(node, ConsoleConsumer.LOG_FILE) > 0, timeout_sec=1,
+ backoff_sec=.25, err_msg="Timed out waiting for log entries to start.")
# Verify no consumed messages
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
self.consumer.stop_node(node)
+ @cluster(num_nodes=4)
def test_version(self):
"""Check that console consumer v0.8.2.X successfully starts and consumes messages."""
self.kafka.start()
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/sanity_checks/test_kafka_version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py
index b33c590..3550093 100644
--- a/tests/kafkatest/sanity_checks/test_kafka_version.py
+++ b/tests/kafkatest/sanity_checks/test_kafka_version.py
@@ -14,6 +14,7 @@
# limitations under the License.
from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
from kafkatest.services.kafka import KafkaService, config_property
from kafkatest.services.zookeeper import ZookeeperService
@@ -32,6 +33,7 @@ class KafkaVersionTest(Test):
def setUp(self):
self.zk.start()
+ @cluster(num_nodes=2)
def test_0_8_2(self):
"""Test kafka service node-versioning api - verify that we can bring up a single-node 0.8.2.X cluster."""
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
@@ -42,6 +44,7 @@ class KafkaVersionTest(Test):
assert is_version(node, [LATEST_0_8_2])
+ @cluster(num_nodes=3)
def test_multi_version(self):
"""Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X,
the other on trunk."""
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/sanity_checks/test_performance_services.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py
index 7b5946a..b939f2b 100644
--- a/tests/kafkatest/sanity_checks/test_performance_services.py
+++ b/tests/kafkatest/sanity_checks/test_performance_services.py
@@ -14,6 +14,7 @@
# limitations under the License.
from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService
@@ -35,6 +36,7 @@ class PerformanceServiceTest(Test):
def setUp(self):
self.zk.start()
+ @cluster(num_nodes=5)
# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
# the overhead should be manageable.
@parametrize(version=str(LATEST_0_8_2), new_consumer=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 23932f3..544d7b9 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -15,6 +15,7 @@
from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
@@ -44,6 +45,7 @@ class TestVerifiableProducer(Test):
self.zk.start()
self.kafka.start()
+ @cluster(num_nodes=3)
@parametrize(producer_version=str(LATEST_0_8_2))
@parametrize(producer_version=str(LATEST_0_9))
@parametrize(producer_version=str(TRUNK))
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 473eb0b..45140fc 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -304,7 +304,8 @@ class VerifiableConnector(object):
self.logger.debug("Ignoring unparseable line: %s", line)
continue
# Filter to only ones matching our name to support multiple verifiable producers
- if data['name'] != self.name: continue
+ if data['name'] != self.name:
+ continue
data['node'] = node
records.append(data)
return records
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 050ea6d..6984fc9 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -15,10 +15,9 @@
import itertools
import os
-import subprocess
from ducktape.services.background_thread import BackgroundThreadService
-from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin
@@ -211,7 +210,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
- except (subprocess.CalledProcessError, ValueError) as e:
+ except (RemoteCommandError, ValueError) as e:
return []
def alive(self, node):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index c79f8c8..f773d8d 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -18,11 +18,11 @@ import json
import os.path
import re
import signal
-import subprocess
import time
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
from config import KafkaConfig
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@@ -121,8 +121,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
@property
def security_config(self):
- return SecurityConfig(self.security_protocol, self.interbroker_security_protocol,
- zk_sasl = self.zk.zk_sasl,
+ return SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
+ zk_sasl=self.zk.zk_sasl,
client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
def open_port(self, protocol):
@@ -208,7 +208,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
node.account.ssh(cmd)
- monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
+ monitor.wait_until("Kafka Server.*started", timeout_sec=30, backoff_sec=.25, err_msg="Kafka server didn't finish startup")
self.start_jmx_tool(self.idx(node), node)
if len(self.pids(node)) == 0:
@@ -221,7 +221,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
- except (subprocess.CalledProcessError, ValueError) as e:
+ except (RemoteCommandError, ValueError) as e:
return []
def signal_node(self, node, sig=signal.SIGTERM):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index b25d8be..29a4202 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -34,7 +34,8 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
self.topic = topic
self.max_messages = max_messages
self.security_protocol = security_protocol
- self.security_config = SecurityConfig(security_protocol)
+ self.security_config = SecurityConfig(self.context, security_protocol)
+ self.stop_timeout_sec = 30
def _worker(self, idx, node):
cmd = self.start_cmd(node)
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 14af4cf..c056705 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -14,10 +14,10 @@
# limitations under the License.
import os
-import subprocess
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@@ -145,7 +145,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
- except (subprocess.CalledProcessError, ValueError) as e:
+ except (RemoteCommandError, ValueError):
return []
def alive(self, node):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/monitor/jmx.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index 19ca5fd..e71040b 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.utils.util import wait_until
+
class JmxMixin(object):
"""This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
@@ -31,12 +34,19 @@ class JmxMixin(object):
self.maximum_jmx_value = {} # map from object_attribute_name to maximum value observed over time
self.average_jmx_value = {} # map from object_attribute_name to average value observed over time
+ self.jmx_tool_log = "/mnt/jmx_tool.log"
+
def clean_node(self, node):
node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
- node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
+ node.account.ssh("rm -rf %s" % self.jmx_tool_log, allow_fail=False)
def start_jmx_tool(self, idx, node):
- if self.started[idx-1] or self.jmx_object_names is None:
+ if self.jmx_object_names is None:
+ self.logger.debug("%s: Not starting jmx tool because no jmx objects are defined" % node.account)
+ return
+
+ if self.started[idx-1]:
+ self.logger.debug("%s: jmx tool has been started already on this node" % node.account)
return
cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", node)
@@ -45,31 +55,43 @@ class JmxMixin(object):
cmd += " --object-name %s" % jmx_object_name
for jmx_attribute in self.jmx_attributes:
cmd += " --attributes %s" % jmx_attribute
- cmd += " | tee -a /mnt/jmx_tool.log"
-
- self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
- jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
- jmx_output.next()
+ cmd += " >> %s &" % self.jmx_tool_log
+ self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd))
+ node.account.ssh(cmd, allow_fail=False)
+ wait_until(lambda: self._jmx_has_output(node), timeout_sec=5, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
self.started[idx-1] = True
+ def _jmx_has_output(self, node):
+ """Helper used as a proxy to determine whether jmx is running by that jmx_tool_log contains output."""
+ try:
+ node.account.ssh("test -z \"$(cat %s)\"" % self.jmx_tool_log, allow_fail=False)
+ return False
+ except RemoteCommandError:
+ return True
+
def read_jmx_output(self, idx, node):
- if self.started[idx-1] == False:
+ if not self.started[idx-1]:
return
object_attribute_names = []
- cmd = "cat /mnt/jmx_tool.log"
+ cmd = "cat %s" % self.jmx_tool_log
self.logger.debug("Read jmx output %d command: %s", idx, cmd)
- for line in node.account.ssh_capture(cmd, allow_fail=False):
+ lines = [line for line in node.account.ssh_capture(cmd, allow_fail=False)]
+ assert len(lines) > 1, "There don't appear to be any samples in the jmx tool log: %s" % lines
+
+ for line in lines:
if "time" in line:
object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
continue
stats = [float(field) for field in line.split(',')]
time_sec = int(stats[0]/1000)
- self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
+ self.jmx_stats[idx-1][time_sec] = {name: stats[i+1] for i, name in enumerate(object_attribute_names)}
# do not calculate average and maximum of jmx stats until we have read output from all nodes
+ # If the service is multithreaded, this means that the results will be aggregated only when the last
+ # service finishes
if any(len(time_to_stats) == 0 for time_to_stats in self.jmx_stats):
return
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 7a0ccdd..1113e0d 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -14,9 +14,9 @@
# limitations under the License.
import os
-import subprocess
-
+import time
from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
from kafkatest.services.monitor.jmx import JmxMixin
@@ -118,7 +118,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
cmd = "jps | grep -i ProducerPerformance | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
- except (subprocess.CalledProcessError, ValueError) as e:
+ except (RemoteCommandError, ValueError) as e:
return []
def alive(self, node):
@@ -136,6 +136,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
self.logger.debug("Producer performance %d command: %s", idx, cmd)
# start ProducerPerformance process
+ start = time.time()
producer_output = node.account.ssh_capture(cmd)
wait_until(lambda: self.alive(node), timeout_sec=20, err_msg="ProducerPerformance failed to start")
# block until there is at least one line of output
@@ -144,7 +145,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
raise Exception("No output from ProducerPerformance")
self.start_jmx_tool(idx, node)
- wait_until(lambda: not self.alive(node), timeout_sec=1200, err_msg="ProducerPerformance failed to finish")
+ wait_until(lambda: not self.alive(node), timeout_sec=1200, backoff_sec=2, err_msg="ProducerPerformance failed to finish")
+ elapsed = time.time() - start
+ self.logger.debug("ProducerPerformance process ran for %s seconds" % elapsed)
+
self.read_jmx_output(idx, node)
# parse producer output from file
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index b7d6b89..0af13f9 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -78,7 +78,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
def wait(self):
for node in self.nodes:
for pid in self.pids(node):
- wait_until(lambda: not node.account.alive(pid), timeout_sec=600, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit")
+ wait_until(lambda: not node.account.alive(pid), timeout_sec=600, backoff_sec=1, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit")
def clean_node(self, node):
node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/replica_verification_tool.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py
index 2f29d16..a2753fd 100644
--- a/tests/kafkatest/services/replica_verification_tool.py
+++ b/tests/kafkatest/services/replica_verification_tool.py
@@ -36,7 +36,7 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
self.topic = topic
self.report_interval_ms = report_interval_ms
self.security_protocol = security_protocol
- self.security_config = SecurityConfig(security_protocol)
+ self.security_config = SecurityConfig(self.context, security_protocol)
self.partition_lag = {}
self.stop_timeout_sec = stop_timeout_sec
@@ -65,6 +65,7 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
topic_partition = topic + ',' + str(partition)
lag = self.partition_lag.get(topic_partition, -1)
self.logger.debug("Retuning lag for {} as {}".format(topic_partition, lag))
+
return lag
def start_cmd(self, node):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/security/minikdc.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py
index 3189ddc..b3cbeae 100644
--- a/tests/kafkatest/services/security/minikdc.py
+++ b/tests/kafkatest/services/security/minikdc.py
@@ -14,6 +14,7 @@
# limitations under the License.
import os
+import random
import uuid
from io import open
from os import remove, close
@@ -39,14 +40,48 @@ class MiniKdc(KafkaPathResolverMixin, Service):
KEYTAB_FILE = "/mnt/minikdc/keytab"
KRB5CONF_FILE = "/mnt/minikdc/krb5.conf"
LOG_FILE = "/mnt/minikdc/minikdc.log"
- LOCAL_KEYTAB_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_keytab"
- LOCAL_KRB5CONF_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_krb5.conf"
+
+ LOCAL_KEYTAB_FILE = None
+ LOCAL_KRB5CONF_FILE = None
+
+ @staticmethod
+ def _set_local_keytab_file(local_scratch_dir):
+ """Set MiniKdc.LOCAL_KEYTAB_FILE exactly once per test.
+
+ LOCAL_KEYTAB_FILE is currently used like a global variable to provide a mechanism to share the
+ location of the local keytab file among all services which might need it.
+
+ Since individual ducktape tests are each run in a subprocess forked from the ducktape main process,
+ class variables set at class load time are duplicated between test processes. This leads to collisions
+ if test subprocesses are run in parallel, so we defer setting these class variables until after the test itself
+ begins to run.
+ """
+ if MiniKdc.LOCAL_KEYTAB_FILE is None:
+ MiniKdc.LOCAL_KEYTAB_FILE = os.path.join(local_scratch_dir, "keytab")
+ return MiniKdc.LOCAL_KEYTAB_FILE
+
+ @staticmethod
+ def _set_local_krb5conf_file(local_scratch_dir):
+ """Set MiniKdc.LOCAL_KRB5CONF_FILE exactly once per test.
+
+ See _set_local_keytab_file for details why we do this.
+ """
+
+ if MiniKdc.LOCAL_KRB5CONF_FILE is None:
+ MiniKdc.LOCAL_KRB5CONF_FILE = os.path.join(local_scratch_dir, "krb5conf")
+ return MiniKdc.LOCAL_KRB5CONF_FILE
def __init__(self, context, kafka_nodes, extra_principals=""):
super(MiniKdc, self).__init__(context, 1)
self.kafka_nodes = kafka_nodes
self.extra_principals = extra_principals
+ # context.local_scratch_dir uses a ducktape feature:
+ # each test_context object has a unique local scratch directory which is available for the duration of the test
+ # which is automatically garbage collected after the test finishes
+ MiniKdc._set_local_keytab_file(context.local_scratch_dir)
+ MiniKdc._set_local_krb5conf_file(context.local_scratch_dir)
+
def replace_in_file(self, file_path, pattern, subst):
fh, abs_path = mkstemp()
with open(abs_path, 'w') as new_file:
@@ -80,8 +115,8 @@ class MiniKdc(KafkaPathResolverMixin, Service):
node.account.ssh(cmd)
monitor.wait_until("MiniKdc Running", timeout_sec=60, backoff_sec=1, err_msg="MiniKdc didn't finish startup")
- node.account.scp_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE)
- node.account.scp_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE)
+ node.account.copy_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE)
+ node.account.copy_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE)
# KDC is set to bind openly (via 0.0.0.0). Change krb5.conf to hold the specific KDC address
self.replace_in_file(MiniKdc.LOCAL_KRB5CONF_FILE, '0.0.0.0', node.account.hostname)
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index 665c4b0..9b29217 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import atexit
import os
import subprocess
from tempfile import mkdtemp
@@ -22,21 +21,23 @@ from ducktape.template import TemplateRenderer
from kafkatest.services.security.minikdc import MiniKdc
import itertools
+
class SslStores(object):
- def __init__(self):
- self.ca_and_truststore_dir = mkdtemp(dir="/tmp")
- self.ca_crt_path = os.path.join(self.ca_and_truststore_dir, "test.ca.crt")
- self.ca_jks_path = os.path.join(self.ca_and_truststore_dir, "test.ca.jks")
+ def __init__(self, local_scratch_dir):
+ self.ca_crt_path = os.path.join(local_scratch_dir, "test.ca.crt")
+ self.ca_jks_path = os.path.join(local_scratch_dir, "test.ca.jks")
self.ca_passwd = "test-ca-passwd"
- self.truststore_path = os.path.join(self.ca_and_truststore_dir, "test.truststore.jks")
+ self.truststore_path = os.path.join(local_scratch_dir, "test.truststore.jks")
self.truststore_passwd = "test-ts-passwd"
self.keystore_passwd = "test-ks-passwd"
self.key_passwd = "test-key-passwd"
# Allow upto one hour of clock skew between host and VMs
self.startdate = "-1H"
- # Register rmtree to run on exit
- atexit.register(rmtree, self.ca_and_truststore_dir)
+
+ for file in [self.ca_crt_path, self.ca_jks_path, self.truststore_path]:
+ if os.path.exists(file):
+ os.remove(file)
def generate_ca(self):
"""
@@ -69,7 +70,7 @@ class SslStores(object):
self.runcmd("keytool -gencert -keystore %s -storepass %s -alias ca -infile %s -outfile %s -dname CN=systemtest -ext SAN=DNS:%s -startdate %s" % (self.ca_jks_path, self.ca_passwd, csr_path, crt_path, self.hostname(node), self.startdate))
self.runcmd("keytool -importcert -keystore %s -storepass %s -alias ca -file %s -noprompt" % (ks_path, self.keystore_passwd, self.ca_crt_path))
self.runcmd("keytool -importcert -keystore %s -storepass %s -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path))
- node.account.scp_to(ks_path, SecurityConfig.KEYSTORE_PATH)
+ node.account.copy_to(ks_path, SecurityConfig.KEYSTORE_PATH)
rmtree(ks_dir)
def hostname(self, node):
@@ -79,9 +80,10 @@ class SslStores(object):
def runcmd(self, cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- proc.communicate()
+ stdout, stderr = proc.communicate()
+
if proc.returncode != 0:
- raise subprocess.CalledProcessError(proc.returncode, cmd)
+ raise RuntimeError("Command '%s' returned non-zero exit status %d: %s" % (cmd, proc.returncode, stdout))
class SecurityConfig(TemplateRenderer):
@@ -99,11 +101,10 @@ class SecurityConfig(TemplateRenderer):
KRB5CONF_PATH = "/mnt/security/krb5.conf"
KEYTAB_PATH = "/mnt/security/keytab"
- ssl_stores = SslStores()
- ssl_stores.generate_ca()
- ssl_stores.generate_truststore()
+ # This is initialized only when the first instance of SecurityConfig is created
+ ssl_stores = None
- def __init__(self, security_protocol=None, interbroker_security_protocol=None,
+ def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
zk_sasl=False, template_props=""):
"""
@@ -114,6 +115,15 @@ class SecurityConfig(TemplateRenderer):
template properties either, PLAINTEXT is used as default.
"""
+ self.context = context
+ if not SecurityConfig.ssl_stores:
+ # This generates keystore/trustore files in a local scratch directory which gets
+ # automatically destroyed after the test is run
+ # Creating within the scratch directory allows us to run tests in parallel without fear of collision
+ SecurityConfig.ssl_stores = SslStores(context.local_scratch_dir)
+ SecurityConfig.ssl_stores.generate_ca()
+ SecurityConfig.ssl_stores.generate_truststore()
+
if security_protocol is None:
security_protocol = self.get_property('security.protocol', template_props)
if security_protocol is None:
@@ -140,13 +150,12 @@ class SecurityConfig(TemplateRenderer):
'sasl.kerberos.service.name' : 'kafka'
}
-
def client_config(self, template_props=""):
- return SecurityConfig(self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
+ return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
def setup_ssl(self, node):
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
- node.account.scp_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH)
+ node.account.copy_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH)
SecurityConfig.ssl_stores.generate_and_copy_keystore(node)
def setup_sasl(self, node):
@@ -162,8 +171,8 @@ class SecurityConfig(TemplateRenderer):
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms)
node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
if self.has_sasl_kerberos:
- node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
- node.account.scp_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH)
+ node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
+ node.account.copy_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH)
def setup_node(self, node):
if self.has_ssl:
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 9c6abdd..c593e2a 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -16,9 +16,9 @@
import json
import os
import signal
-import subprocess
from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition
@@ -243,7 +243,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
cmd = "jps | grep -i VerifiableConsumer | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
- except (subprocess.CalledProcessError, ValueError) as e:
+ except (RemoteCommandError, ValueError) as e:
return []
def try_parse_json(self, string):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index dbdf71f..205143e 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -16,19 +16,22 @@
import json
import os
import signal
-import subprocess
import time
from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
from kafkatest.utils import is_int, is_int_with_prefix
from kafkatest.version import TRUNK, LATEST_0_8_2
+from kafkatest.utils.remote_account import line_count
class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
PERSISTENT_ROOT = "/mnt/verifiable_producer"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout")
+ STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr")
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
LOG_FILE = os.path.join(LOG_DIR, "verifiable_producer.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
@@ -38,6 +41,9 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
"verifiable_producer_stdout": {
"path": STDOUT_CAPTURE,
"collect_default": False},
+ "verifiable_producer_stderr": {
+ "path": STDERR_CAPTURE,
+ "collect_default": False},
"verifiable_producer_log": {
"path": LOG_FILE,
"collect_default": True}
@@ -114,37 +120,63 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
self.produced_count[idx] = 0
last_produced_time = time.time()
prev_msg = None
- for line in node.account.ssh_capture(cmd):
- line = line.strip()
-
- data = self.try_parse_json(line)
- if data is not None:
-
- with self.lock:
- if data["name"] == "producer_send_error":
- data["node"] = idx
- self.not_acked_values.append(self.message_validator(data["value"]))
- self.produced_count[idx] += 1
-
- elif data["name"] == "producer_send_success":
- self.acked_values.append(self.message_validator(data["value"]))
- self.produced_count[idx] += 1
-
- # Log information if there is a large gap between successively acknowledged messages
- t = time.time()
- time_delta_sec = t - last_produced_time
- if time_delta_sec > 2 and prev_msg is not None:
- self.logger.debug(
- "Time delta between successively acked messages is large: " +
- "delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data)))
-
- last_produced_time = t
- prev_msg = data
-
- elif data["name"] == "shutdown_complete":
- if node in self.clean_shutdown_nodes:
- raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx)
- self.clean_shutdown_nodes.add(node)
+ node.account.ssh(cmd)
+
+ # Ensure that STDOUT_CAPTURE exists before try to read from it
+ # Note that if max_messages is configured, it's possible for the process to exit before this
+ # wait_until condition is checked
+ start = time.time()
+ wait_until(lambda: node.account.isfile(VerifiableProducer.STDOUT_CAPTURE) and
+ line_count(node, VerifiableProducer.STDOUT_CAPTURE) > 0,
+ timeout_sec=10, err_msg="%s: VerifiableProducer took too long to start" % node.account)
+ self.logger.debug("%s: VerifiableProducer took %s seconds to start" % (node.account, time.time() - start))
+
+ with node.account.open(VerifiableProducer.STDOUT_CAPTURE, 'r') as f:
+ while True:
+ line = f.readline()
+ if line == '' and not self.alive(node):
+ # The process is gone, and we've reached the end of the output file, so we don't expect
+ # any more output to appear in the STDOUT_CAPTURE file
+ break
+
+ line = line.strip()
+
+ data = self.try_parse_json(line)
+ if data is not None:
+
+ with self.lock:
+ if data["name"] == "producer_send_error":
+ data["node"] = idx
+ self.not_acked_values.append(self.message_validator(data["value"]))
+ self.produced_count[idx] += 1
+
+ elif data["name"] == "producer_send_success":
+ self.acked_values.append(self.message_validator(data["value"]))
+ self.produced_count[idx] += 1
+
+ # Log information if there is a large gap between successively acknowledged messages
+ t = time.time()
+ time_delta_sec = t - last_produced_time
+ if time_delta_sec > 2 and prev_msg is not None:
+ self.logger.debug(
+ "Time delta between successively acked messages is large: " +
+ "delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data)))
+
+ last_produced_time = t
+ prev_msg = data
+
+ elif data["name"] == "shutdown_complete":
+ if node in self.clean_shutdown_nodes:
+ raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx)
+ self.clean_shutdown_nodes.add(node)
+
+ def _has_output(self, node):
+ """Helper used as a proxy to determine whether jmx is running by that jmx_tool_log contains output."""
+ try:
+ node.account.ssh("test -z \"$(cat %s)\"" % VerifiableProducer.STDOUT_CAPTURE, allow_fail=False)
+ return False
+ except RemoteCommandError:
+ return True
def start_cmd(self, node, idx):
cmd = ""
@@ -171,10 +203,10 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
if self.message_validator == is_int_with_prefix:
cmd += " --value-prefix %s" % str(idx)
if self.acks is not None:
- cmd += " --acks %s\n" % str(self.acks)
+ cmd += " --acks %s " % str(self.acks)
cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
- cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
+ cmd += " 2>> %s 1>> %s &" % (VerifiableProducer.STDERR_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
return cmd
def kill_node(self, node, clean_shutdown=True, allow_fail=False):
@@ -190,7 +222,7 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
cmd = "jps | grep -i VerifiableProducer | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
- except (subprocess.CalledProcessError, ValueError) as e:
+ except (RemoteCommandError, ValueError) as e:
return []
def alive(self, node):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index 2019889..8d38d48 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -15,10 +15,11 @@
import re
-import subprocess
import time
from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.security.security_config import SecurityConfig
@@ -46,7 +47,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
@property
def security_config(self):
- return SecurityConfig(zk_sasl=self.zk_sasl)
+ return SecurityConfig(self.context, zk_sasl=self.zk_sasl)
@property
def security_system_properties(self):
@@ -85,7 +86,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
- except (subprocess.CalledProcessError, ValueError) as e:
+ except (RemoteCommandError, ValueError) as e:
return []
def alive(self, node):
@@ -95,6 +96,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
idx = self.idx(node)
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
node.account.kill_process("zookeeper", allow_fail=False)
+ wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed out waiting for zookeeper to stop.")
def clean_node(self, node):
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
index 0de53ae..9301de4 100644
--- a/tests/kafkatest/tests/client/compression_test.py
+++ b/tests/kafkatest/tests/client/compression_test.py
@@ -15,6 +15,7 @@
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@@ -23,6 +24,7 @@ from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix
+
class CompressionTest(ProduceConsumeValidateTest):
"""
These tests validate produce / consume for compressed topics.
@@ -51,6 +53,7 @@ class CompressionTest(ProduceConsumeValidateTest):
# Override this since we're adding services outside of the constructor
return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+ @cluster(num_nodes=7)
@parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
@parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
def test_compressed_topic(self, compression_types, new_consumer):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
index 3cd3c7c..e5904b1 100644
--- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
from kafkatest.services.kafka import TopicPartition
@@ -43,6 +44,7 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+ @cluster(num_nodes=4)
def rolling_update_test(self):
"""
Verify rolling updates of partition assignment strategies works correctly. In this
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index 534f65c..a68e23e 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -15,12 +15,14 @@
from ducktape.mark import matrix
from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
from kafkatest.services.kafka import TopicPartition
import signal
+
class OffsetValidationTest(VerifiableConsumerTest):
TOPIC = "test_topic"
NUM_PARTITIONS = 1
@@ -72,6 +74,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
return consumer
+ @cluster(num_nodes=7)
def test_broker_rolling_bounce(self):
"""
Verify correct consumer behavior when the brokers are consecutively restarted.
@@ -112,6 +115,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
assert consumer.current_position(partition) == consumer.total_consumed(), \
"Total consumed records did not match consumed position"
+ @cluster(num_nodes=7)
@matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
def test_consumer_bounce(self, clean_shutdown, bounce_mode):
"""
@@ -152,6 +156,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
assert consumer.current_position(partition) <= consumer.total_consumed(), \
"Current position greater than the total number of consumed records"
+ @cluster(num_nodes=7)
@matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
def test_consumer_failure(self, clean_shutdown, enable_autocommit):
partition = TopicPartition(self.TOPIC, 0)
@@ -194,7 +199,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
assert consumer.last_commit(partition) == consumer.current_position(partition), \
"Last committed offset did not match last consumed position"
-
+ @cluster(num_nodes=7)
@matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
def test_broker_failure(self, clean_shutdown, enable_autocommit):
partition = TopicPartition(self.TOPIC, 0)
@@ -229,6 +234,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
assert consumer.last_commit(partition) == consumer.current_position(partition), \
"Last committed offset did not match last consumed position"
+ @cluster(num_nodes=7)
def test_group_consumption(self):
"""
Verifies correct group rebalance behavior as consumers are started and stopped.
@@ -277,6 +283,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
})
+ @cluster(num_nodes=6)
@matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
"org.apache.kafka.clients.consumer.RoundRobinAssignor"])
def test_valid_assignment(self, assignment_strategy):
@@ -294,4 +301,4 @@ class AssignmentValidationTest(VerifiableConsumerTest):
consumer.start_node(node)
self.await_members(consumer, num_started)
assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
-
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
index a57c04b..edcead2 100644
--- a/tests/kafkatest/tests/client/message_format_change_test.py
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -14,6 +14,7 @@
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
@@ -55,7 +56,8 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
timeout_sec=120, backoff_sec=1,
err_msg="Producer did not produce all messages in reasonable amount of time"))
-
+
+ @cluster(num_nodes=10)
@parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
def test_compatibility(self, producer_version, consumer_version):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index 1d31569..baed837 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -15,6 +15,7 @@
from ducktape.tests.test import Test
from ducktape.mark import matrix, parametrize
+from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@@ -124,6 +125,7 @@ class QuotaTest(Test):
"""Override this since we're adding services outside of the constructor"""
return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+ @cluster(num_nodes=5)
@matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
@parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index ee0a222..f49bb5d 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -14,18 +14,22 @@
# limitations under the License.
from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+from ducktape.mark import matrix, parametrize
+from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
-from ducktape.utils.util import wait_until
-from ducktape.mark import matrix
-import subprocess, itertools, time
+
+import itertools, time
from collections import Counter, namedtuple
import operator
+
class ConnectDistributedTest(Test):
"""
Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
@@ -139,6 +143,7 @@ class ConnectDistributedTest(Test):
status = self._connector_status(connector.name, node)
return self._task_has_state(task_id, status, 'RUNNING')
+ @cluster(num_nodes=5)
def test_restart_failed_connector(self):
self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
@@ -155,6 +160,7 @@ class ConnectDistributedTest(Test):
wait_until(lambda: self.connector_is_running(self.sink), timeout_sec=10,
err_msg="Failed to see connector transition to the RUNNING state")
+ @cluster(num_nodes=5)
@matrix(connector_type=["source", "sink"])
def test_restart_failed_task(self, connector_type):
self.setup_services()
@@ -178,7 +184,7 @@ class ConnectDistributedTest(Test):
wait_until(lambda: self.task_is_running(connector, task_id), timeout_sec=10,
err_msg="Failed to see task transition to the RUNNING state")
-
+ @cluster(num_nodes=5)
def test_pause_and_resume_source(self):
"""
Verify that source connectors stop producing records when paused and begin again after
@@ -217,6 +223,7 @@ class ConnectDistributedTest(Test):
wait_until(lambda: len(self.source.messages()) > num_messages, timeout_sec=30,
err_msg="Failed to produce messages after resuming source connector")
+ @cluster(num_nodes=5)
def test_pause_and_resume_sink(self):
"""
Verify that sink connectors stop consuming records when paused and begin again after
@@ -259,7 +266,7 @@ class ConnectDistributedTest(Test):
wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
err_msg="Failed to consume messages after resuming source connector")
-
+ @cluster(num_nodes=5)
def test_pause_state_persistent(self):
"""
Verify that paused state is preserved after a cluster restart.
@@ -284,7 +291,10 @@ class ConnectDistributedTest(Test):
wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
err_msg="Failed to see connector startup in PAUSED state")
- @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+ @cluster(num_nodes=5)
+ @parametrize(security_protocol=SecurityConfig.PLAINTEXT)
+ @cluster(num_nodes=6)
+ @parametrize(security_protocol=SecurityConfig.SASL_SSL)
def test_file_source_and_sink(self, security_protocol):
"""
Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
@@ -315,7 +325,7 @@ class ConnectDistributedTest(Test):
node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")
-
+ @cluster(num_nodes=5)
@matrix(clean=[True, False])
def test_bounce(self, clean):
"""
@@ -424,8 +434,6 @@ class ConnectDistributedTest(Test):
assert success, "Found validation errors:\n" + "\n ".join(errors)
-
-
def _validate_file_output(self, input):
input_set = set(input)
# Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
@@ -437,8 +445,8 @@ class ConnectDistributedTest(Test):
def _file_contents(self, node, file):
try:
- # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
+ # Convert to a list here or the RemoteCommandError may be returned during a call to the generator instead of
# immediately
return list(node.account.ssh_capture("cat " + file))
- except subprocess.CalledProcessError:
+ except RemoteCommandError:
return []
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 70bc32c..098790b 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -16,7 +16,9 @@
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
from ducktape.utils.util import wait_until
-import subprocess
+from ducktape.mark.resource import cluster
+from ducktape.cluster.remoteaccount import RemoteCommandError
+
import json
import itertools
@@ -57,6 +59,7 @@ class ConnectRestApiTest(KafkaTest):
self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
+ @cluster(num_nodes=4)
def test_rest_api(self):
# Template parameters
self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
@@ -171,10 +174,10 @@ class ConnectRestApiTest(KafkaTest):
def file_contents(self, node, file):
try:
- # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
+ # Convert to a list here or the RemoteCommandError may be returned during a call to the generator instead of
# immediately
return list(node.account.ssh_capture("cat " + file))
- except subprocess.CalledProcessError:
+ except RemoteCommandError:
return []
def _config_dict_from_props(self, connector_props):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 83acb4a..9436119 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -14,15 +14,20 @@
# limitations under the License.
from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize, matrix
+from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectStandaloneService
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
-from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize, matrix
-import hashlib, subprocess, json
+
+import hashlib
+import json
+
class ConnectStandaloneFileTest(Test):
"""
@@ -58,10 +63,13 @@ class ConnectStandaloneFileTest(Test):
self.zk = ZookeeperService(test_context, self.num_zk)
+ @cluster(num_nodes=5)
@parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
@parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
@parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
- @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+ @parametrize(security_protocol=SecurityConfig.PLAINTEXT)
+ @cluster(num_nodes=6)
+ @parametrize(security_protocol=SecurityConfig.SASL_SSL)
def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
"""
Validates basic end-to-end functionality of Connect standalone using the file source and sink converters. Includes
@@ -85,7 +93,6 @@ class ConnectStandaloneFileTest(Test):
self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
consumer_timeout_ms=1000)
-
self.zk.start()
self.kafka.start()
@@ -118,5 +125,5 @@ class ConnectStandaloneFileTest(Test):
try:
output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
return output_hash == hashlib.md5(value).hexdigest()
- except subprocess.CalledProcessError:
+ except RemoteCommandError:
return False
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index d6a0a12..f3931ec 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -14,6 +14,7 @@
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
@@ -43,6 +44,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
self.num_consumers = 1
self.messages_per_producer = 1000
+ @cluster(num_nodes=6)
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)
@@ -54,7 +56,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
-
+
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/consumer_group_command_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py
index c3f59d9..c03022a 100644
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ b/tests/kafkatest/tests/core/consumer_group_command_test.py
@@ -17,6 +17,7 @@
from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@@ -28,6 +29,7 @@ import re
TOPIC = "topic-consumer-group-command"
+
class ConsumerGroupCommandTest(Test):
"""
Tests ConsumerGroupCommand
@@ -89,6 +91,7 @@ class ConsumerGroupCommandTest(Test):
self.consumer.stop()
+ @cluster(num_nodes=3)
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
"""
@@ -97,6 +100,7 @@ class ConsumerGroupCommandTest(Test):
"""
self.setup_and_verify(security_protocol)
+ @cluster(num_nodes=3)
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
"""
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/get_offset_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py
index 38bd9dc..e45365d 100644
--- a/tests/kafkatest/tests/core/get_offset_shell_test.py
+++ b/tests/kafkatest/tests/core/get_offset_shell_test.py
@@ -16,8 +16,9 @@
from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
-from kafkatest.services.verifiable_producer import VerifiableProducer
+from ducktape.mark.resource import cluster
+from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.console_consumer import ConsoleConsumer
@@ -28,6 +29,7 @@ MAX_MESSAGES = 100
NUM_PARTITIONS = 1
REPLICATION_FACTOR = 1
+
class GetOffsetShellTest(Test):
"""
Tests GetOffsetShell tool
@@ -44,7 +46,6 @@ class GetOffsetShellTest(Test):
self.zk = ZookeeperService(test_context, self.num_zk)
-
def setUp(self):
self.zk.start()
@@ -69,6 +70,7 @@ class GetOffsetShellTest(Test):
consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
self.consumer.start()
+ @cluster(num_nodes=4)
def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
"""
Tests if GetOffsetShell is getting offsets correctly
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/mirror_maker_test.py b/tests/kafkatest/tests/core/mirror_maker_test.py
index afb1972..ce86a60 100644
--- a/tests/kafkatest/tests/core/mirror_maker_test.py
+++ b/tests/kafkatest/tests/core/mirror_maker_test.py
@@ -14,7 +14,8 @@
# limitations under the License.
from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize, matrix, ignore
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@@ -110,8 +111,11 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
+ @cluster(num_nodes=7)
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
- @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
+ @matrix(security_protocol=['PLAINTEXT', 'SSL'], new_consumer=[True])
+ @cluster(num_nodes=8)
+ @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
def test_simple_end_to_end(self, security_protocol, new_consumer):
"""
Test end-to-end behavior under non-failure conditions.
@@ -140,8 +144,11 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages)
self.mirror_maker.stop()
+ @cluster(num_nodes=7)
@matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
- @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+ @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL'])
+ @cluster(num_nodes=8)
+ @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
"""
Test end-to-end behavior under failure conditions.
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py
index 850e2aa..fef57d1 100644
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/core/reassign_partitions_test.py
@@ -14,6 +14,7 @@
# limitations under the License.
from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from kafkatest.services.zookeeper import ZookeeperService
@@ -24,6 +25,7 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
import random
+
class ReassignPartitionsTest(ProduceConsumeValidateTest):
"""
These tests validate partition reassignment.
@@ -86,6 +88,7 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
# Wait until finished or timeout
wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5)
+ @cluster(num_nodes=7)
@parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
@parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
def test_reassign_partitions(self, bounce_brokers, security_protocol):
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
index f815034..a95e9e5 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -16,6 +16,7 @@
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@@ -118,7 +119,7 @@ class ReplicationTest(ProduceConsumeValidateTest):
"""Override this since we're adding services outside of the constructor"""
return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
+ @cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["leader"],
security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
@@ -146,7 +147,7 @@ class ReplicationTest(ProduceConsumeValidateTest):
self.kafka.interbroker_security_protocol = security_protocol
self.kafka.client_sasl_mechanism = client_sasl_mechanism
self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
- new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
+ new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.start()
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index 51b2e60..a21e845 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -20,8 +20,8 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.utils import is_int
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from ducktape.mark import parametrize
-from ducktape.mark import matrix
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
from kafkatest.services.security.kafka_acls import ACLs
import time
@@ -102,7 +102,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
# Bounce again with ACLs for new mechanism
self.set_authorizer_and_bounce(security_protocol, security_protocol)
- @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+ @cluster(num_nodes=8)
+ @matrix(client_protocol=["SSL"])
+ @cluster(num_nodes=9)
+ @matrix(client_protocol=["SASL_PLAINTEXT", "SASL_SSL"])
def test_rolling_upgrade_phase_one(self, client_protocol):
"""
Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
@@ -123,6 +126,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.create_producer_and_consumer()
self.run_produce_consume_validate(lambda: time.sleep(1))
+ @cluster(num_nodes=8)
@matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
"""
@@ -143,6 +147,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
#Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
+ @cluster(num_nodes=9)
@parametrize(new_client_sasl_mechanism='PLAIN')
def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
"""
@@ -166,6 +171,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.create_producer_and_consumer()
self.run_produce_consume_validate(lambda: time.sleep(1))
+ @cluster(num_nodes=8)
@parametrize(new_sasl_mechanism='PLAIN')
def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
"""