You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/12 02:43:27 UTC

[2/2] kafka git commit: KAFKA-4140: Upgrade to ducktape 0.6.0 and make system tests parallel friendly

KAFKA-4140: Upgrade to ducktape 0.6.0 and make system tests parallel friendly

Updates to take advantage of soon-to-be-released ducktape features.

Author: Geoff Anderson <ge...@confluent.io>
Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1834 from granders/systest-parallel-friendly


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62e043a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62e043a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62e043a8

Branch: refs/heads/trunk
Commit: 62e043a86565cc9bc485658d6c6d176e9aff620f
Parents: 6f7ed15
Author: Geoff Anderson <ge...@confluent.io>
Authored: Sun Dec 11 18:43:23 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sun Dec 11 18:43:23 2016 -0800

----------------------------------------------------------------------
 .../kafkatest/benchmarks/core/benchmark_test.py |  13 ++-
 .../streams/streams_simple_benchmark_test.py    |   5 +-
 .../sanity_checks/test_console_consumer.py      |  12 ++-
 .../sanity_checks/test_kafka_version.py         |   3 +
 .../sanity_checks/test_performance_services.py  |   2 +
 .../sanity_checks/test_verifiable_producer.py   |   2 +
 tests/kafkatest/services/connect.py             |   3 +-
 tests/kafkatest/services/console_consumer.py    |   5 +-
 tests/kafkatest/services/kafka/kafka.py         |  10 +-
 .../kafkatest/services/kafka_log4j_appender.py  |   3 +-
 tests/kafkatest/services/mirror_maker.py        |   4 +-
 tests/kafkatest/services/monitor/jmx.py         |  44 ++++++--
 .../performance/producer_performance.py         |  12 ++-
 .../services/performance/streams_performance.py |   2 +-
 .../services/replica_verification_tool.py       |   3 +-
 tests/kafkatest/services/security/minikdc.py    |  43 +++++++-
 .../services/security/security_config.py        |  49 +++++----
 tests/kafkatest/services/verifiable_consumer.py |   4 +-
 tests/kafkatest/services/verifiable_producer.py | 102 ++++++++++++-------
 tests/kafkatest/services/zookeeper.py           |   8 +-
 .../kafkatest/tests/client/compression_test.py  |   3 +
 .../client/consumer_rolling_upgrade_test.py     |   4 +-
 tests/kafkatest/tests/client/consumer_test.py   |  11 +-
 .../tests/client/message_format_change_test.py  |   4 +-
 tests/kafkatest/tests/client/quota_test.py      |   2 +
 .../tests/connect/connect_distributed_test.py   |  30 ++++--
 .../tests/connect/connect_rest_test.py          |   9 +-
 tests/kafkatest/tests/connect/connect_test.py   |  19 ++--
 .../core/compatibility_test_new_broker_test.py  |   4 +-
 .../tests/core/consumer_group_command_test.py   |   4 +
 .../tests/core/get_offset_shell_test.py         |   6 +-
 tests/kafkatest/tests/core/mirror_maker_test.py |  13 ++-
 .../tests/core/reassign_partitions_test.py      |   3 +
 tests/kafkatest/tests/core/replication_test.py  |   5 +-
 .../tests/core/security_rolling_upgrade_test.py |  12 ++-
 tests/kafkatest/tests/core/security_test.py     |  55 ++++++----
 .../tests/core/simple_consumer_shell_test.py    |   4 +
 tests/kafkatest/tests/core/throttling_test.py   |   2 +
 tests/kafkatest/tests/core/upgrade_test.py      |   5 +
 .../core/zookeeper_security_upgrade_test.py     |   6 +-
 .../tests/streams/streams_bounce_test.py        |   8 +-
 .../tests/streams/streams_smoke_test.py         |  11 +-
 .../tests/tools/log4j_appender_test.py          |   8 +-
 .../tests/tools/replica_verification_test.py    |   8 +-
 tests/setup.py                                  |   2 +-
 45 files changed, 400 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/benchmarks/core/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 4dbf902..14fab2f 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -15,6 +15,7 @@
 
 from ducktape.mark import matrix
 from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
 from ducktape.services.service import Service
 from ducktape.tests.test import Test
 
@@ -63,11 +64,13 @@ class Benchmark(Test):
         self.kafka.log_level = "INFO"  # We don't DEBUG logging here
         self.kafka.start()
 
+    @cluster(num_nodes=5)
     @parametrize(acks=1, topic=TOPIC_REP_ONE)
     @parametrize(acks=1, topic=TOPIC_REP_THREE)
     @parametrize(acks=-1, topic=TOPIC_REP_THREE)
-    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
     @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL'])
+    @cluster(num_nodes=7)
+    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
     def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE,
                                  compression_type="none", security_protocol='PLAINTEXT', client_version=str(TRUNK),
                                  broker_version=str(TRUNK)):
@@ -97,6 +100,7 @@ class Benchmark(Test):
         self.producer.run()
         return compute_aggregate_throughput(self.producer)
 
+    @cluster(num_nodes=5)
     @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
     @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
     def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT',
@@ -152,8 +156,11 @@ class Benchmark(Test):
         self.logger.info("\n".join(summary))
         return data
 
+    @cluster(num_nodes=5)
     @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
+    @cluster(num_nodes=6)
+    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
     def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT",
                                 interbroker_security_protocol=None, client_version=str(TRUNK),
                                 broker_version=str(TRUNK)):
@@ -181,6 +188,7 @@ class Benchmark(Test):
         self.perf.run()
         return latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
 
+    @cluster(num_nodes=6)
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
     @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
     @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
@@ -229,6 +237,7 @@ class Benchmark(Test):
         self.logger.info("\n".join(summary))
         return data
 
+    @cluster(num_nodes=6)
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
     @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
     @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index de687e6..ab9b112 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -13,11 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import ignore
+from ducktape.mark.resource import cluster
 
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
-import time
+
 
 class StreamsSimpleBenchmarkTest(KafkaTest):
     """
@@ -29,6 +29,7 @@ class StreamsSimpleBenchmarkTest(KafkaTest):
 
         self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L)
 
+    @cluster(num_nodes=3)
     def test_simple_benchmark(self):
         """
         Run simple Kafka Streams benchmark

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 18cbfb7..38db057 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -17,6 +17,7 @@ import time
 
 from ducktape.mark import matrix
 from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 
@@ -42,9 +43,12 @@ class ConsoleConsumerTest(Test):
     def setUp(self):
         self.zk.start()
 
+    @cluster(num_nodes=3)
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    @cluster(num_nodes=4)
     @parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
     def test_lifecycle(self, security_protocol, new_consumer=True, sasl_mechanism='GSSAPI'):
         """Check that console consumer starts/stops properly, and that we are capturing log output."""
 
@@ -66,14 +70,16 @@ class ConsoleConsumerTest(Test):
 
         # Verify that log output is happening
         wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10,
-                   err_msg="Timed out waiting for logging to start.")
-        assert line_count(node, ConsoleConsumer.LOG_FILE) > 0
+                   err_msg="Timed out waiting for consumer log file to exist.")
+        wait_until(lambda: line_count(node, ConsoleConsumer.LOG_FILE) > 0, timeout_sec=1,
+                   backoff_sec=.25, err_msg="Timed out waiting for log entries to start.")
 
         # Verify no consumed messages
         assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
 
         self.consumer.stop_node(node)
 
+    @cluster(num_nodes=4)
     def test_version(self):
         """Check that console consumer v0.8.2.X successfully starts and consumes messages."""
         self.kafka.start()

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/sanity_checks/test_kafka_version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py
index b33c590..3550093 100644
--- a/tests/kafkatest/sanity_checks/test_kafka_version.py
+++ b/tests/kafkatest/sanity_checks/test_kafka_version.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.kafka import KafkaService, config_property
 from kafkatest.services.zookeeper import ZookeeperService
@@ -32,6 +33,7 @@ class KafkaVersionTest(Test):
     def setUp(self):
         self.zk.start()
 
+    @cluster(num_nodes=2)
     def test_0_8_2(self):
         """Test kafka service node-versioning api - verify that we can bring up a single-node 0.8.2.X cluster."""
         self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
@@ -42,6 +44,7 @@ class KafkaVersionTest(Test):
 
         assert is_version(node, [LATEST_0_8_2])
 
+    @cluster(num_nodes=3)
     def test_multi_version(self):
         """Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X,
         the other on trunk."""

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/sanity_checks/test_performance_services.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py
index 7b5946a..b939f2b 100644
--- a/tests/kafkatest/sanity_checks/test_performance_services.py
+++ b/tests/kafkatest/sanity_checks/test_performance_services.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 
 from kafkatest.services.kafka import KafkaService
@@ -35,6 +36,7 @@ class PerformanceServiceTest(Test):
     def setUp(self):
         self.zk.start()
 
+    @cluster(num_nodes=5)
     # We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
     # the overhead should be manageable.
     @parametrize(version=str(LATEST_0_8_2), new_consumer=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 23932f3..544d7b9 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -15,6 +15,7 @@
 
 
 from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 
@@ -44,6 +45,7 @@ class TestVerifiableProducer(Test):
         self.zk.start()
         self.kafka.start()
 
+    @cluster(num_nodes=3)
     @parametrize(producer_version=str(LATEST_0_8_2))
     @parametrize(producer_version=str(LATEST_0_9))
     @parametrize(producer_version=str(TRUNK))

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 473eb0b..45140fc 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -304,7 +304,8 @@ class VerifiableConnector(object):
                     self.logger.debug("Ignoring unparseable line: %s", line)
                     continue
                 # Filter to only ones matching our name to support multiple verifiable producers
-                if data['name'] != self.name: continue
+                if data['name'] != self.name:
+                    continue
                 data['node'] = node
                 records.append(data)
         return records

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 050ea6d..6984fc9 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -15,10 +15,9 @@
 
 import itertools
 import os
-import subprocess
 
 from ducktape.services.background_thread import BackgroundThreadService
-from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin
@@ -211,7 +210,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'"
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
             return pid_arr
-        except (subprocess.CalledProcessError, ValueError) as e:
+        except (RemoteCommandError, ValueError) as e:
             return []
 
     def alive(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index c79f8c8..f773d8d 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -18,11 +18,11 @@ import json
 import os.path
 import re
 import signal
-import subprocess
 import time
 
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from config import KafkaConfig
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@@ -121,8 +121,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
     @property
     def security_config(self):
-        return SecurityConfig(self.security_protocol, self.interbroker_security_protocol,
-                              zk_sasl = self.zk.zk_sasl,
+        return SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
+                              zk_sasl=self.zk.zk_sasl,
                               client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
 
     def open_port(self, protocol):
@@ -208,7 +208,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
         with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
             node.account.ssh(cmd)
-            monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
+            monitor.wait_until("Kafka Server.*started", timeout_sec=30, backoff_sec=.25, err_msg="Kafka server didn't finish startup")
 
         self.start_jmx_tool(self.idx(node), node)
         if len(self.pids(node)) == 0:
@@ -221,7 +221,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
             return pid_arr
-        except (subprocess.CalledProcessError, ValueError) as e:
+        except (RemoteCommandError, ValueError) as e:
             return []
 
     def signal_node(self, node, sig=signal.SIGTERM):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index b25d8be..29a4202 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -34,7 +34,8 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
         self.topic = topic
         self.max_messages = max_messages
         self.security_protocol = security_protocol
-        self.security_config = SecurityConfig(security_protocol)
+        self.security_config = SecurityConfig(self.context, security_protocol)
+        self.stop_timeout_sec = 30
 
     def _worker(self, idx, node):
         cmd = self.start_cmd(node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 14af4cf..c056705 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -14,10 +14,10 @@
 # limitations under the License.
 
 import os
-import subprocess
 
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
@@ -145,7 +145,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
             cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
             return pid_arr
-        except (subprocess.CalledProcessError, ValueError) as e:
+        except (RemoteCommandError, ValueError):
             return []
 
     def alive(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/monitor/jmx.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index 19ca5fd..e71040b 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -13,6 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.utils.util import wait_until
+
 
 class JmxMixin(object):
     """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
@@ -31,12 +34,19 @@ class JmxMixin(object):
         self.maximum_jmx_value = {}  # map from object_attribute_name to maximum value observed over time
         self.average_jmx_value = {}  # map from object_attribute_name to average value observed over time
 
+        self.jmx_tool_log = "/mnt/jmx_tool.log"
+
     def clean_node(self, node):
         node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
+        node.account.ssh("rm -rf %s" % self.jmx_tool_log, allow_fail=False)
 
     def start_jmx_tool(self, idx, node):
-        if self.started[idx-1] or self.jmx_object_names is None:
+        if self.jmx_object_names is None:
+            self.logger.debug("%s: Not starting jmx tool because no jmx objects are defined" % node.account)
+            return
+
+        if self.started[idx-1]:
+            self.logger.debug("%s: jmx tool has been started already on this node" % node.account)
             return
 
         cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", node)
@@ -45,31 +55,43 @@ class JmxMixin(object):
             cmd += " --object-name %s" % jmx_object_name
         for jmx_attribute in self.jmx_attributes:
             cmd += " --attributes %s" % jmx_attribute
-        cmd += " | tee -a /mnt/jmx_tool.log"
-
-        self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
-        jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
-        jmx_output.next()
+        cmd += " >> %s &" % self.jmx_tool_log
 
+        self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd))
+        node.account.ssh(cmd, allow_fail=False)
+        wait_until(lambda: self._jmx_has_output(node), timeout_sec=5, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
         self.started[idx-1] = True
 
+    def _jmx_has_output(self, node):
+        """Helper used as a proxy to determine whether jmx is running by that jmx_tool_log contains output."""
+        try:
+            node.account.ssh("test -z \"$(cat %s)\"" % self.jmx_tool_log, allow_fail=False)
+            return False
+        except RemoteCommandError:
+            return True
+
     def read_jmx_output(self, idx, node):
-        if self.started[idx-1] == False:
+        if not self.started[idx-1]:
             return
 
         object_attribute_names = []
 
-        cmd = "cat /mnt/jmx_tool.log"
+        cmd = "cat %s" % self.jmx_tool_log
         self.logger.debug("Read jmx output %d command: %s", idx, cmd)
-        for line in node.account.ssh_capture(cmd, allow_fail=False):
+        lines = [line for line in node.account.ssh_capture(cmd, allow_fail=False)]
+        assert len(lines) > 1, "There don't appear to be any samples in the jmx tool log: %s" % lines
+
+        for line in lines:
             if "time" in line:
                 object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
                 continue
             stats = [float(field) for field in line.split(',')]
             time_sec = int(stats[0]/1000)
-            self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
+            self.jmx_stats[idx-1][time_sec] = {name: stats[i+1] for i, name in enumerate(object_attribute_names)}
 
         # do not calculate average and maximum of jmx stats until we have read output from all nodes
+        # If the service is multithreaded, this means that the results will be aggregated only when the last
+        # service finishes
         if any(len(time_to_stats) == 0 for time_to_stats in self.jmx_stats):
             return
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 7a0ccdd..1113e0d 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -14,9 +14,9 @@
 # limitations under the License.
 
 import os
-import subprocess
-
+import time
 from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.directory_layout.kafka_path import  TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
 from kafkatest.services.monitor.jmx import JmxMixin
@@ -118,7 +118,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             cmd = "jps | grep -i ProducerPerformance | awk '{print $1}'"
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
             return pid_arr
-        except (subprocess.CalledProcessError, ValueError) as e:
+        except (RemoteCommandError, ValueError) as e:
             return []
 
     def alive(self, node):
@@ -136,6 +136,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
         self.logger.debug("Producer performance %d command: %s", idx, cmd)
 
         # start ProducerPerformance process
+        start = time.time()
         producer_output = node.account.ssh_capture(cmd)
         wait_until(lambda: self.alive(node), timeout_sec=20, err_msg="ProducerPerformance failed to start")
         # block until there is at least one line of output
@@ -144,7 +145,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             raise Exception("No output from ProducerPerformance")
 
         self.start_jmx_tool(idx, node)
-        wait_until(lambda: not self.alive(node), timeout_sec=1200, err_msg="ProducerPerformance failed to finish")
+        wait_until(lambda: not self.alive(node), timeout_sec=1200, backoff_sec=2, err_msg="ProducerPerformance failed to finish")
+        elapsed = time.time() - start
+        self.logger.debug("ProducerPerformance process ran for %s seconds" % elapsed)
+
         self.read_jmx_output(idx, node)
 
         # parse producer output from file

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index b7d6b89..0af13f9 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -78,7 +78,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
     def wait(self):
         for node in self.nodes:
             for pid in self.pids(node):
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=600, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit")
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=600, backoff_sec=1, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit")
 
     def clean_node(self, node):
         node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/replica_verification_tool.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py
index 2f29d16..a2753fd 100644
--- a/tests/kafkatest/services/replica_verification_tool.py
+++ b/tests/kafkatest/services/replica_verification_tool.py
@@ -36,7 +36,7 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
         self.topic = topic
         self.report_interval_ms = report_interval_ms
         self.security_protocol = security_protocol
-        self.security_config = SecurityConfig(security_protocol)
+        self.security_config = SecurityConfig(self.context, security_protocol)
         self.partition_lag = {}
         self.stop_timeout_sec = stop_timeout_sec
 
@@ -65,6 +65,7 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
         topic_partition = topic + ',' + str(partition)
         lag = self.partition_lag.get(topic_partition, -1)
         self.logger.debug("Retuning lag for {} as {}".format(topic_partition, lag))
+
         return lag
 
     def start_cmd(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/security/minikdc.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py
index 3189ddc..b3cbeae 100644
--- a/tests/kafkatest/services/security/minikdc.py
+++ b/tests/kafkatest/services/security/minikdc.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import os
+import random
 import uuid
 from io import open
 from os import remove, close
@@ -39,14 +40,48 @@ class MiniKdc(KafkaPathResolverMixin, Service):
     KEYTAB_FILE = "/mnt/minikdc/keytab"
     KRB5CONF_FILE = "/mnt/minikdc/krb5.conf"
     LOG_FILE = "/mnt/minikdc/minikdc.log"
-    LOCAL_KEYTAB_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_keytab"
-    LOCAL_KRB5CONF_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_krb5.conf"
+
+    LOCAL_KEYTAB_FILE = None
+    LOCAL_KRB5CONF_FILE = None
+
+    @staticmethod
+    def _set_local_keytab_file(local_scratch_dir):
+        """Set MiniKdc.LOCAL_KEYTAB_FILE exactly once per test.
+
+        LOCAL_KEYTAB_FILE is currently used like a global variable to provide a mechanism to share the
+        location of the local keytab file among all services which might need it.
+
+        Since individual ducktape tests are each run in a subprocess forked from the ducktape main process,
+        class variables set at class load time are duplicated between test processes. This leads to collisions
+        if test subprocesses are run in parallel, so we defer setting these class variables until after the test itself
+        begins to run.
+        """
+        if MiniKdc.LOCAL_KEYTAB_FILE is None:
+            MiniKdc.LOCAL_KEYTAB_FILE = os.path.join(local_scratch_dir, "keytab")
+        return MiniKdc.LOCAL_KEYTAB_FILE
+
+    @staticmethod
+    def _set_local_krb5conf_file(local_scratch_dir):
+        """Set MiniKdc.LOCAL_KRB5CONF_FILE exactly once per test.
+
+        See _set_local_keytab_file for details why we do this.
+        """
+
+        if MiniKdc.LOCAL_KRB5CONF_FILE is None:
+            MiniKdc.LOCAL_KRB5CONF_FILE = os.path.join(local_scratch_dir, "krb5conf")
+        return MiniKdc.LOCAL_KRB5CONF_FILE
 
     def __init__(self, context, kafka_nodes, extra_principals=""):
         super(MiniKdc, self).__init__(context, 1)
         self.kafka_nodes = kafka_nodes
         self.extra_principals = extra_principals
 
+        # context.local_scratch_dir uses a ducktape feature:
+        # each test_context object has a unique local scratch directory which is available for the duration of the test
+        # which is automatically garbage collected after the test finishes
+        MiniKdc._set_local_keytab_file(context.local_scratch_dir)
+        MiniKdc._set_local_krb5conf_file(context.local_scratch_dir)
+
     def replace_in_file(self, file_path, pattern, subst):
         fh, abs_path = mkstemp()
         with open(abs_path, 'w') as new_file:
@@ -80,8 +115,8 @@ class MiniKdc(KafkaPathResolverMixin, Service):
             node.account.ssh(cmd)
             monitor.wait_until("MiniKdc Running", timeout_sec=60, backoff_sec=1, err_msg="MiniKdc didn't finish startup")
 
-        node.account.scp_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE)
-        node.account.scp_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE)
+        node.account.copy_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE)
+        node.account.copy_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE)
 
         # KDC is set to bind openly (via 0.0.0.0). Change krb5.conf to hold the specific KDC address
         self.replace_in_file(MiniKdc.LOCAL_KRB5CONF_FILE, '0.0.0.0', node.account.hostname)

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index 665c4b0..9b29217 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import atexit
 import os
 import subprocess
 from tempfile import mkdtemp
@@ -22,21 +21,23 @@ from ducktape.template import TemplateRenderer
 from kafkatest.services.security.minikdc import MiniKdc
 import itertools
 
+
 class SslStores(object):
-    def __init__(self):
-        self.ca_and_truststore_dir = mkdtemp(dir="/tmp")
-        self.ca_crt_path = os.path.join(self.ca_and_truststore_dir, "test.ca.crt")
-        self.ca_jks_path = os.path.join(self.ca_and_truststore_dir, "test.ca.jks")
+    def __init__(self, local_scratch_dir):
+        self.ca_crt_path = os.path.join(local_scratch_dir, "test.ca.crt")
+        self.ca_jks_path = os.path.join(local_scratch_dir, "test.ca.jks")
         self.ca_passwd = "test-ca-passwd"
 
-        self.truststore_path = os.path.join(self.ca_and_truststore_dir, "test.truststore.jks")
+        self.truststore_path = os.path.join(local_scratch_dir, "test.truststore.jks")
         self.truststore_passwd = "test-ts-passwd"
         self.keystore_passwd = "test-ks-passwd"
         self.key_passwd = "test-key-passwd"
         # Allow upto one hour of clock skew between host and VMs
         self.startdate = "-1H"
-        # Register rmtree to run on exit
-        atexit.register(rmtree, self.ca_and_truststore_dir)
+
+        for file in [self.ca_crt_path, self.ca_jks_path, self.truststore_path]:
+            if os.path.exists(file):
+                os.remove(file)
 
     def generate_ca(self):
         """
@@ -69,7 +70,7 @@ class SslStores(object):
         self.runcmd("keytool -gencert -keystore %s -storepass %s -alias ca -infile %s -outfile %s -dname CN=systemtest -ext SAN=DNS:%s -startdate %s" % (self.ca_jks_path, self.ca_passwd, csr_path, crt_path, self.hostname(node), self.startdate))
         self.runcmd("keytool -importcert -keystore %s -storepass %s -alias ca -file %s -noprompt" % (ks_path, self.keystore_passwd, self.ca_crt_path))
         self.runcmd("keytool -importcert -keystore %s -storepass %s -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path))
-        node.account.scp_to(ks_path, SecurityConfig.KEYSTORE_PATH)
+        node.account.copy_to(ks_path, SecurityConfig.KEYSTORE_PATH)
         rmtree(ks_dir)
 
     def hostname(self, node):
@@ -79,9 +80,10 @@ class SslStores(object):
 
     def runcmd(self, cmd):
         proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-        proc.communicate()
+        stdout, stderr = proc.communicate()
+
         if proc.returncode != 0:
-            raise subprocess.CalledProcessError(proc.returncode, cmd)
+            raise RuntimeError("Command '%s' returned non-zero exit status %d: %s" % (cmd, proc.returncode, stdout))
 
 
 class SecurityConfig(TemplateRenderer):
@@ -99,11 +101,10 @@ class SecurityConfig(TemplateRenderer):
     KRB5CONF_PATH = "/mnt/security/krb5.conf"
     KEYTAB_PATH = "/mnt/security/keytab"
 
-    ssl_stores = SslStores()
-    ssl_stores.generate_ca()
-    ssl_stores.generate_truststore()
+    # This is initialized only when the first instance of SecurityConfig is created
+    ssl_stores = None
 
-    def __init__(self, security_protocol=None, interbroker_security_protocol=None,
+    def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
                  client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
                  zk_sasl=False, template_props=""):
         """
@@ -114,6 +115,15 @@ class SecurityConfig(TemplateRenderer):
         template properties either, PLAINTEXT is used as default.
         """
 
+        self.context = context
+        if not SecurityConfig.ssl_stores:
+            # This generates keystore/trustore files in a local scratch directory which gets
+            # automatically destroyed after the test is run
+            # Creating within the scratch directory allows us to run tests in parallel without fear of collision
+            SecurityConfig.ssl_stores = SslStores(context.local_scratch_dir)
+            SecurityConfig.ssl_stores.generate_ca()
+            SecurityConfig.ssl_stores.generate_truststore()
+
         if security_protocol is None:
             security_protocol = self.get_property('security.protocol', template_props)
         if security_protocol is None:
@@ -140,13 +150,12 @@ class SecurityConfig(TemplateRenderer):
             'sasl.kerberos.service.name' : 'kafka'
         }
 
-
     def client_config(self, template_props=""):
-        return SecurityConfig(self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
+        return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
 
     def setup_ssl(self, node):
         node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
-        node.account.scp_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH)
+        node.account.copy_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH)
         SecurityConfig.ssl_stores.generate_and_copy_keystore(node)
 
     def setup_sasl(self, node):
@@ -162,8 +171,8 @@ class SecurityConfig(TemplateRenderer):
                                 enabled_sasl_mechanisms=self.enabled_sasl_mechanisms)
         node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
         if self.has_sasl_kerberos:
-            node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
-            node.account.scp_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH)
+            node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
+            node.account.copy_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH)
 
     def setup_node(self, node):
         if self.has_ssl:

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 9c6abdd..c593e2a 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -16,9 +16,9 @@
 import json
 import os
 import signal
-import subprocess
 
 from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.kafka import TopicPartition
@@ -243,7 +243,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
             cmd = "jps | grep -i VerifiableConsumer | awk '{print $1}'"
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
             return pid_arr
-        except (subprocess.CalledProcessError, ValueError) as e:
+        except (RemoteCommandError, ValueError) as e:
             return []
 
     def try_parse_json(self, string):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index dbdf71f..205143e 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -16,19 +16,22 @@
 import json
 import os
 import signal
-import subprocess
 import time
 
 from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
 from kafkatest.utils import is_int, is_int_with_prefix
 from kafkatest.version import TRUNK, LATEST_0_8_2
+from kafkatest.utils.remote_account import line_count
 
 
 class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
     PERSISTENT_ROOT = "/mnt/verifiable_producer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr")
     LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
     LOG_FILE = os.path.join(LOG_DIR, "verifiable_producer.log")
     LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
@@ -38,6 +41,9 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
         "verifiable_producer_stdout": {
             "path": STDOUT_CAPTURE,
             "collect_default": False},
+        "verifiable_producer_stderr": {
+            "path": STDERR_CAPTURE,
+            "collect_default": False},
         "verifiable_producer_log": {
             "path": LOG_FILE,
             "collect_default": True}
@@ -114,37 +120,63 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
         self.produced_count[idx] = 0
         last_produced_time = time.time()
         prev_msg = None
-        for line in node.account.ssh_capture(cmd):
-            line = line.strip()
-
-            data = self.try_parse_json(line)
-            if data is not None:
-
-                with self.lock:
-                    if data["name"] == "producer_send_error":
-                        data["node"] = idx
-                        self.not_acked_values.append(self.message_validator(data["value"]))
-                        self.produced_count[idx] += 1
-
-                    elif data["name"] == "producer_send_success":
-                        self.acked_values.append(self.message_validator(data["value"]))
-                        self.produced_count[idx] += 1
-
-                        # Log information if there is a large gap between successively acknowledged messages
-                        t = time.time()
-                        time_delta_sec = t - last_produced_time
-                        if time_delta_sec > 2 and prev_msg is not None:
-                            self.logger.debug(
-                                "Time delta between successively acked messages is large: " +
-                                "delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data)))
-
-                        last_produced_time = t
-                        prev_msg = data
-
-                    elif data["name"] == "shutdown_complete":
-                        if node in self.clean_shutdown_nodes:
-                            raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx)
-                        self.clean_shutdown_nodes.add(node)
+        node.account.ssh(cmd)
+
+        # Ensure that STDOUT_CAPTURE exists before try to read from it
+        # Note that if max_messages is configured, it's possible for the process to exit before this
+        # wait_until condition is checked
+        start = time.time()
+        wait_until(lambda: node.account.isfile(VerifiableProducer.STDOUT_CAPTURE) and
+                           line_count(node, VerifiableProducer.STDOUT_CAPTURE) > 0,
+                   timeout_sec=10, err_msg="%s: VerifiableProducer took too long to start" % node.account)
+        self.logger.debug("%s: VerifiableProducer took %s seconds to start" % (node.account, time.time() - start))
+
+        with node.account.open(VerifiableProducer.STDOUT_CAPTURE, 'r') as f:
+            while True:
+                line = f.readline()
+                if line == '' and not self.alive(node):
+                    # The process is gone, and we've reached the end of the output file, so we don't expect
+                    # any more output to appear in the STDOUT_CAPTURE file
+                    break
+
+                line = line.strip()
+
+                data = self.try_parse_json(line)
+                if data is not None:
+
+                    with self.lock:
+                        if data["name"] == "producer_send_error":
+                            data["node"] = idx
+                            self.not_acked_values.append(self.message_validator(data["value"]))
+                            self.produced_count[idx] += 1
+
+                        elif data["name"] == "producer_send_success":
+                            self.acked_values.append(self.message_validator(data["value"]))
+                            self.produced_count[idx] += 1
+
+                            # Log information if there is a large gap between successively acknowledged messages
+                            t = time.time()
+                            time_delta_sec = t - last_produced_time
+                            if time_delta_sec > 2 and prev_msg is not None:
+                                self.logger.debug(
+                                    "Time delta between successively acked messages is large: " +
+                                    "delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data)))
+
+                            last_produced_time = t
+                            prev_msg = data
+
+                        elif data["name"] == "shutdown_complete":
+                            if node in self.clean_shutdown_nodes:
+                                raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx)
+                            self.clean_shutdown_nodes.add(node)
+
+    def _has_output(self, node):
+        """Helper used as a proxy to determine whether jmx is running by that jmx_tool_log contains output."""
+        try:
+            node.account.ssh("test -z \"$(cat %s)\"" % VerifiableProducer.STDOUT_CAPTURE, allow_fail=False)
+            return False
+        except RemoteCommandError:
+            return True
 
     def start_cmd(self, node, idx):
         cmd = ""
@@ -171,10 +203,10 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
         if self.message_validator == is_int_with_prefix:
             cmd += " --value-prefix %s" % str(idx)
         if self.acks is not None:
-            cmd += " --acks %s\n" % str(self.acks)
+            cmd += " --acks %s " % str(self.acks)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
-        cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
+        cmd += " 2>> %s 1>> %s &" % (VerifiableProducer.STDERR_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
         return cmd
 
     def kill_node(self, node, clean_shutdown=True, allow_fail=False):
@@ -190,7 +222,7 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
             cmd = "jps | grep -i VerifiableProducer | awk '{print $1}'"
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
             return pid_arr
-        except (subprocess.CalledProcessError, ValueError) as e:
+        except (RemoteCommandError, ValueError) as e:
             return []
 
     def alive(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index 2019889..8d38d48 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -15,10 +15,11 @@
 
 
 import re
-import subprocess
 import time
 
 from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.security.security_config import SecurityConfig
@@ -46,7 +47,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
 
     @property
     def security_config(self):
-        return SecurityConfig(zk_sasl=self.zk_sasl)
+        return SecurityConfig(self.context, zk_sasl=self.zk_sasl)
 
     @property
     def security_system_properties(self):
@@ -85,7 +86,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
             cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'"
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
             return pid_arr
-        except (subprocess.CalledProcessError, ValueError) as e:
+        except (RemoteCommandError, ValueError) as e:
             return []
 
     def alive(self, node):
@@ -95,6 +96,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         idx = self.idx(node)
         self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
         node.account.kill_process("zookeeper", allow_fail=False)
+        wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed out waiting for zookeeper to stop.")
 
     def clean_node(self, node):
         self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
index 0de53ae..9301de4 100644
--- a/tests/kafkatest/tests/client/compression_test.py
+++ b/tests/kafkatest/tests/client/compression_test.py
@@ -15,6 +15,7 @@
 
 from ducktape.mark import parametrize
 from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -23,6 +24,7 @@ from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int_with_prefix
 
+
 class CompressionTest(ProduceConsumeValidateTest):
     """
     These tests validate produce / consume for compressed topics.
@@ -51,6 +53,7 @@ class CompressionTest(ProduceConsumeValidateTest):
         # Override this since we're adding services outside of the constructor
         return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
+    @cluster(num_nodes=7)
     @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
     @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
     def test_compressed_topic(self, compression_types, new_consumer):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
index 3cd3c7c..e5904b1 100644
--- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -13,7 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
 
 from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
 from kafkatest.services.kafka import TopicPartition
@@ -43,6 +44,7 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
             frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
             frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
 
+    @cluster(num_nodes=4)
     def rolling_update_test(self):
         """
         Verify rolling updates of partition assignment strategies works correctly. In this

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index 534f65c..a68e23e 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -15,12 +15,14 @@
 
 from ducktape.mark import matrix
 from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
 
 from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
 from kafkatest.services.kafka import TopicPartition
 
 import signal
 
+
 class OffsetValidationTest(VerifiableConsumerTest):
     TOPIC = "test_topic"
     NUM_PARTITIONS = 1
@@ -72,6 +74,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
         return consumer
 
+    @cluster(num_nodes=7)
     def test_broker_rolling_bounce(self):
         """
         Verify correct consumer behavior when the brokers are consecutively restarted.
@@ -112,6 +115,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         assert consumer.current_position(partition) == consumer.total_consumed(), \
             "Total consumed records did not match consumed position"
 
+    @cluster(num_nodes=7)
     @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
     def test_consumer_bounce(self, clean_shutdown, bounce_mode):
         """
@@ -152,6 +156,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
             assert consumer.current_position(partition) <= consumer.total_consumed(), \
                 "Current position greater than the total number of consumed records"
 
+    @cluster(num_nodes=7)
     @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
     def test_consumer_failure(self, clean_shutdown, enable_autocommit):
         partition = TopicPartition(self.TOPIC, 0)
@@ -194,7 +199,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
             assert consumer.last_commit(partition) == consumer.current_position(partition), \
                 "Last committed offset did not match last consumed position"
 
-
+    @cluster(num_nodes=7)
     @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
     def test_broker_failure(self, clean_shutdown, enable_autocommit):
         partition = TopicPartition(self.TOPIC, 0)
@@ -229,6 +234,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
             assert consumer.last_commit(partition) == consumer.current_position(partition), \
                 "Last committed offset did not match last consumed position"
 
+    @cluster(num_nodes=7)
     def test_group_consumption(self):
         """
         Verifies correct group rebalance behavior as consumers are started and stopped. 
@@ -277,6 +283,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
             self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
         })
 
+    @cluster(num_nodes=6)
     @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
                                  "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
     def test_valid_assignment(self, assignment_strategy):
@@ -294,4 +301,4 @@ class AssignmentValidationTest(VerifiableConsumerTest):
             consumer.start_node(node)
             self.await_members(consumer, num_started)
             assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
-            
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
index a57c04b..edcead2 100644
--- a/tests/kafkatest/tests/client/message_format_change_test.py
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -14,6 +14,7 @@
 
 from ducktape.mark import parametrize
 from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
@@ -55,7 +56,8 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
             lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
             timeout_sec=120, backoff_sec=1,
             err_msg="Producer did not produce all messages in reasonable amount of time"))
-        
+
+    @cluster(num_nodes=10)
     @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
     @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
     def test_compatibility(self, producer_version, consumer_version):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index 1d31569..baed837 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -15,6 +15,7 @@
 
 from ducktape.tests.test import Test
 from ducktape.mark import matrix, parametrize
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -124,6 +125,7 @@ class QuotaTest(Test):
         """Override this since we're adding services outside of the constructor"""
         return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
+    @cluster(num_nodes=5)
     @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
     @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
     def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index ee0a222..f49bb5d 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -14,18 +14,22 @@
 # limitations under the License.
 
 from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+from ducktape.mark import matrix, parametrize
+from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
-from ducktape.utils.util import wait_until
-from ducktape.mark import matrix
-import subprocess, itertools, time
+
+import itertools, time
 from collections import Counter, namedtuple
 import operator
 
+
 class ConnectDistributedTest(Test):
     """
     Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
@@ -139,6 +143,7 @@ class ConnectDistributedTest(Test):
         status = self._connector_status(connector.name, node)
         return self._task_has_state(task_id, status, 'RUNNING')
 
+    @cluster(num_nodes=5)
     def test_restart_failed_connector(self):
         self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
@@ -155,6 +160,7 @@ class ConnectDistributedTest(Test):
         wait_until(lambda: self.connector_is_running(self.sink), timeout_sec=10,
                    err_msg="Failed to see connector transition to the RUNNING state")
 
+    @cluster(num_nodes=5)
     @matrix(connector_type=["source", "sink"])
     def test_restart_failed_task(self, connector_type):
         self.setup_services()
@@ -178,7 +184,7 @@ class ConnectDistributedTest(Test):
         wait_until(lambda: self.task_is_running(connector, task_id), timeout_sec=10,
                    err_msg="Failed to see task transition to the RUNNING state")
 
-
+    @cluster(num_nodes=5)
     def test_pause_and_resume_source(self):
         """
         Verify that source connectors stop producing records when paused and begin again after
@@ -217,6 +223,7 @@ class ConnectDistributedTest(Test):
         wait_until(lambda: len(self.source.messages()) > num_messages, timeout_sec=30,
                    err_msg="Failed to produce messages after resuming source connector")
 
+    @cluster(num_nodes=5)
     def test_pause_and_resume_sink(self):
         """
         Verify that sink connectors stop consuming records when paused and begin again after
@@ -259,7 +266,7 @@ class ConnectDistributedTest(Test):
         wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
                    err_msg="Failed to consume messages after resuming source connector")
 
-
+    @cluster(num_nodes=5)
     def test_pause_state_persistent(self):
         """
         Verify that paused state is preserved after a cluster restart.
@@ -284,7 +291,10 @@ class ConnectDistributedTest(Test):
             wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
                        err_msg="Failed to see connector startup in PAUSED state")
 
-    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+    @cluster(num_nodes=5)
+    @parametrize(security_protocol=SecurityConfig.PLAINTEXT)
+    @cluster(num_nodes=6)
+    @parametrize(security_protocol=SecurityConfig.SASL_SSL)
     def test_file_source_and_sink(self, security_protocol):
         """
         Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
@@ -315,7 +325,7 @@ class ConnectDistributedTest(Test):
             node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
         wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")
 
-
+    @cluster(num_nodes=5)
     @matrix(clean=[True, False])
     def test_bounce(self, clean):
         """
@@ -424,8 +434,6 @@ class ConnectDistributedTest(Test):
 
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
-
-
     def _validate_file_output(self, input):
         input_set = set(input)
         # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
@@ -437,8 +445,8 @@ class ConnectDistributedTest(Test):
 
     def _file_contents(self, node, file):
         try:
-            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
+            # Convert to a list here or the RemoteCommandError may be returned during a call to the generator instead of
             # immediately
             return list(node.account.ssh_capture("cat " + file))
-        except subprocess.CalledProcessError:
+        except RemoteCommandError:
             return []

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 70bc32c..098790b 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -16,7 +16,9 @@
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
 from ducktape.utils.util import wait_until
-import subprocess
+from ducktape.mark.resource import cluster
+from ducktape.cluster.remoteaccount import RemoteCommandError
+
 import json
 import itertools
 
@@ -57,6 +59,7 @@ class ConnectRestApiTest(KafkaTest):
 
         self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
 
+    @cluster(num_nodes=4)
     def test_rest_api(self):
         # Template parameters
         self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
@@ -171,10 +174,10 @@ class ConnectRestApiTest(KafkaTest):
 
     def file_contents(self, node, file):
         try:
-            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
+            # Convert to a list here or the RemoteCommandError may be returned during a call to the generator instead of
             # immediately
             return list(node.account.ssh_capture("cat " + file))
-        except subprocess.CalledProcessError:
+        except RemoteCommandError:
             return []
 
     def _config_dict_from_props(self, connector_props):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 83acb4a..9436119 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -14,15 +14,20 @@
 # limitations under the License.
 
 from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize, matrix
+from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.connect import ConnectStandaloneService
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
-from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize, matrix
-import hashlib, subprocess, json
+
+import hashlib
+import json
+
 
 class ConnectStandaloneFileTest(Test):
     """
@@ -58,10 +63,13 @@ class ConnectStandaloneFileTest(Test):
 
         self.zk = ZookeeperService(test_context, self.num_zk)
 
+    @cluster(num_nodes=5)
     @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
     @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
     @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
-    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+    @parametrize(security_protocol=SecurityConfig.PLAINTEXT)
+    @cluster(num_nodes=6)
+    @parametrize(security_protocol=SecurityConfig.SASL_SSL)
     def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
         """
         Validates basic end-to-end functionality of Connect standalone using the file source and sink converters. Includes
@@ -85,7 +93,6 @@ class ConnectStandaloneFileTest(Test):
         self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
                                                   consumer_timeout_ms=1000)
 
-
         self.zk.start()
         self.kafka.start()
 
@@ -118,5 +125,5 @@ class ConnectStandaloneFileTest(Test):
         try:
             output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
             return output_hash == hashlib.md5(value).hexdigest()
-        except subprocess.CalledProcessError:
+        except RemoteCommandError:
             return False

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index d6a0a12..f3931ec 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -14,6 +14,7 @@
 
 from ducktape.mark import parametrize
 from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
@@ -43,6 +44,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
         self.num_consumers = 1
         self.messages_per_producer = 1000
 
+    @cluster(num_nodes=6)
     @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
     @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
     @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)
@@ -54,7 +56,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
     @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
     @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
     def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
-       
+
         self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
                                                                     "partitions": 3,
                                                                     "replication-factor": 3,

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/consumer_group_command_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py
index c3f59d9..c03022a 100644
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ b/tests/kafkatest/tests/core/consumer_group_command_test.py
@@ -17,6 +17,7 @@
 from ducktape.utils.util import wait_until
 from ducktape.tests.test import Test
 from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -28,6 +29,7 @@ import re
 
 TOPIC = "topic-consumer-group-command"
 
+
 class ConsumerGroupCommandTest(Test):
     """
     Tests ConsumerGroupCommand
@@ -89,6 +91,7 @@ class ConsumerGroupCommandTest(Test):
 
         self.consumer.stop()
 
+    @cluster(num_nodes=3)
     @matrix(security_protocol=['PLAINTEXT', 'SSL'])
     def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
         """
@@ -97,6 +100,7 @@ class ConsumerGroupCommandTest(Test):
         """
         self.setup_and_verify(security_protocol)
 
+    @cluster(num_nodes=3)
     @matrix(security_protocol=['PLAINTEXT', 'SSL'])
     def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
         """

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/get_offset_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py
index 38bd9dc..e45365d 100644
--- a/tests/kafkatest/tests/core/get_offset_shell_test.py
+++ b/tests/kafkatest/tests/core/get_offset_shell_test.py
@@ -16,8 +16,9 @@
 
 from ducktape.utils.util import wait_until
 from ducktape.tests.test import Test
-from kafkatest.services.verifiable_producer import VerifiableProducer
+from ducktape.mark.resource import cluster
 
+from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.console_consumer import ConsoleConsumer
@@ -28,6 +29,7 @@ MAX_MESSAGES = 100
 NUM_PARTITIONS = 1
 REPLICATION_FACTOR = 1
 
+
 class GetOffsetShellTest(Test):
     """
     Tests GetOffsetShell tool
@@ -44,7 +46,6 @@ class GetOffsetShellTest(Test):
         self.zk = ZookeeperService(test_context, self.num_zk)
 
 
-
     def setUp(self):
         self.zk.start()
 
@@ -69,6 +70,7 @@ class GetOffsetShellTest(Test):
                                         consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
         self.consumer.start()
 
+    @cluster(num_nodes=4)
     def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
         """
         Tests if GetOffsetShell is getting offsets correctly

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/mirror_maker_test.py b/tests/kafkatest/tests/core/mirror_maker_test.py
index afb1972..ce86a60 100644
--- a/tests/kafkatest/tests/core/mirror_maker_test.py
+++ b/tests/kafkatest/tests/core/mirror_maker_test.py
@@ -14,7 +14,8 @@
 # limitations under the License.
 
 from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize, matrix, ignore
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -110,8 +111,11 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
         wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
                      err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
 
+    @cluster(num_nodes=7)
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'], new_consumer=[True])
+    @cluster(num_nodes=8)
+    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
     def test_simple_end_to_end(self, security_protocol, new_consumer):
         """
         Test end-to-end behavior under non-failure conditions.
@@ -140,8 +144,11 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages)
         self.mirror_maker.stop()
 
+    @cluster(num_nodes=7)
     @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
-    @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+    @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL'])
+    @cluster(num_nodes=8)
+    @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
     def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
         """
         Test end-to-end behavior under failure conditions.

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py
index 850e2aa..fef57d1 100644
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/core/reassign_partitions_test.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
 from kafkatest.services.zookeeper import ZookeeperService
@@ -24,6 +25,7 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
 import random
 
+
 class ReassignPartitionsTest(ProduceConsumeValidateTest):
     """
     These tests validate partition reassignment.
@@ -86,6 +88,7 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
         # Wait until finished or timeout
         wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5)
 
+    @cluster(num_nodes=7)
     @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
     @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
     def test_reassign_partitions(self, bounce_brokers, security_protocol):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
index f815034..a95e9e5 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -16,6 +16,7 @@
 from ducktape.utils.util import wait_until
 
 from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -118,7 +119,7 @@ class ReplicationTest(ProduceConsumeValidateTest):
         """Override this since we're adding services outside of the constructor"""
         return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
-
+    @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["leader"],
             security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
@@ -146,7 +147,7 @@ class ReplicationTest(ProduceConsumeValidateTest):
         self.kafka.interbroker_security_protocol = security_protocol
         self.kafka.client_sasl_mechanism = client_sasl_mechanism
         self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
-        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
+        new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
         self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
         self.kafka.start()

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index 51b2e60..a21e845 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -20,8 +20,8 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils import is_int
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from ducktape.mark import parametrize
-from ducktape.mark import matrix
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
 from kafkatest.services.security.kafka_acls import ACLs
 import time
 
@@ -102,7 +102,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         # Bounce again with ACLs for new mechanism
         self.set_authorizer_and_bounce(security_protocol, security_protocol)
 
-    @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+    @cluster(num_nodes=8)
+    @matrix(client_protocol=["SSL"])
+    @cluster(num_nodes=9)
+    @matrix(client_protocol=["SASL_PLAINTEXT", "SASL_SSL"])
     def test_rolling_upgrade_phase_one(self, client_protocol):
         """
         Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
@@ -123,6 +126,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.create_producer_and_consumer()
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
+    @cluster(num_nodes=8)
     @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
     def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
         """
@@ -143,6 +147,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
         self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
 
+    @cluster(num_nodes=9)
     @parametrize(new_client_sasl_mechanism='PLAIN')
     def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
         """
@@ -166,6 +171,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.create_producer_and_consumer()
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
+    @cluster(num_nodes=8)
     @parametrize(new_sasl_mechanism='PLAIN')
     def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
         """