You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/29 02:21:44 UTC

[2/2] kafka git commit: KAFKA-2276; KIP-25 initial patch

KAFKA-2276; KIP-25 initial patch

Initial patch for KIP-25

Note that to install ducktape, do *not* use pip to install ducktape. Instead:

```
$ git clone gitgithub.com:confluentinc/ducktape.git
$ cd ducktape
$ python setup.py install
```

Author: Geoff Anderson <ge...@confluent.io>
Author: Geoff <gr...@gmail.com>
Author: Liquan Pei <li...@gmail.com>

Reviewers: Ewen, Gwen, Jun, Guozhang

Closes #70 from granders/KAFKA-2276 and squashes the following commits:

a62fb6c [Geoff Anderson] fixed checkstyle errors
a70f0f8 [Geoff Anderson] Merged in upstream trunk.
8b62019 [Geoff Anderson] Merged in upstream trunk.
47b7b64 [Geoff Anderson] Created separate tools jar so that the clients package does not pull in dependencies on the Jackson JSON tools or argparse4j.
a9e6a14 [Geoff Anderson] Merged in upstream changes
d18db7b [Geoff Anderson] fixed :rat errors (needed to add licenses)
321fdf8 [Geoff Anderson] Ignore tests/ and vagrant/ directories when running rat build task
795fc75 [Geoff Anderson] Merged in changes from upstream trunk.
1d93f06 [Geoff Anderson] Updated provisioning to use java 7 in light of KAFKA-2316
2ea4e29 [Geoff Anderson] Tweaked README, changed default log collection behavior on VerifiableProducer
0eb6fdc [Geoff Anderson] Merged in system-tests
69dd7be [Geoff Anderson] Merged in trunk
4034dd6 [Geoff Anderson] Merged in upstream trunk
ede6450 [Geoff] Merge pull request #4 from confluentinc/move_muckrake
7751545 [Geoff Anderson] Corrected license headers
e6d532f [Geoff Anderson] java 7 -> java 6
8c61e2d [Geoff Anderson] Reverted jdk back to 6
f14c507 [Geoff Anderson] Removed mode = "test" from Vagrantfile and Vagrantfile.local examples. Updated testing README to clarify aws setup.
98b7253 [Geoff Anderson] Updated consumer tests to pre-populate kafka logs
e6a41f1 [Geoff Anderson] removed stray println
b15b24f [Geoff Anderson] leftover KafkaBenchmark in super call
0f75187 [Geoff Anderson] Rmoved stray allow_fail. kafka_benchmark_test -> benchmark_test
f469f84 [Geoff Anderson] Tweaked readme, added example Vagrantfile.local
3d73857 [Geoff Anderson] Merged downstream changes
42dcdb1 [Geoff Anderson] Tweaked behavior of stop_node, clean_node to generally fail fast
7f7c3e0 [Geoff Anderson] Updated setup.py for kafkatest
c60125c [Geoff Anderson] TestEndToEndLatency -> EndToEndLatency
4f476fe [Geoff Anderson] Moved aws scripts to vagrant directory
5af88fc [Geoff Anderson] Updated README to include aws quickstart
e5edf03 [Geoff Anderson] Updated example aws Vagrantfile.local
96533c3 [Geoff] Update aws-access-keys-commands
25a413d [Geoff] Update aws-example-Vagrantfile.local
884b20e [Geoff Anderson] Moved a bunch of files to kafkatest directory
fc7c81c [Geoff Anderson] added setup.py
632be12 [Geoff] Merge pull request #3 from confluentinc/verbose-client
51a94fd [Geoff Anderson] Use argparse4j instead of joptsimple. ThroughputThrottler now has more intuitive behavior when targetThroughput is 0.
a80a428 [Geoff Anderson] Added shell program for VerifiableProducer.
d586fb0 [Geoff Anderson] Updated comments to reflect that throttler is not message-specific
6842ed1 [Geoff Anderson] left out a file from last commit
1228eef [Geoff Anderson] Renamed throttler
9100417 [Geoff Anderson] Updated command-line options for VerifiableProducer. Extracted throughput logic to make it reusable.
0a5de8e [Geoff Anderson] Fixed checkstyle errors. Changed name to VerifiableProducer. Added synchronization for thread safety on println statements.
475423b [Geoff Anderson] Convert class to string before adding to json object.
bc009f2 [Geoff Anderson] Got rid of VerboseProducer in core (moved to clients)
c0526fe [Geoff Anderson] Updates per review comments.
8b4b1f2 [Geoff Anderson] Minor updates to VerboseProducer
2777712 [Geoff Anderson] Added some metadata to producer output.
da94b8c [Geoff Anderson] Added number of messages option.
07cd1c6 [Geoff Anderson] Added simple producer which prints status of produced messages to stdout.
a278988 [Geoff Anderson] fixed typos
f1914c3 [Liquan Pei] Merge pull request #2 from confluentinc/system_tests
81e4156 [Liquan Pei] Bootstrap Kafka system tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e43c9aff
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e43c9aff
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e43c9aff

Branch: refs/heads/trunk
Commit: e43c9aff92c57da6abb0c1d0af3431a550110a89
Parents: f4101ab
Author: Geoff Anderson <ge...@confluent.io>
Authored: Tue Jul 28 17:22:14 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jul 28 17:22:14 2015 -0700

----------------------------------------------------------------------
 .gitignore                                      |   5 +
 Vagrantfile                                     |  39 ++-
 bin/kafka-run-class.sh                          |  10 +
 bin/kafka-verifiable-producer.sh                |  20 ++
 build.gradle                                    |  60 +++-
 checkstyle/import-control.xml                   |   2 +
 .../clients/tools/ProducerPerformance.java      | 219 -------------
 .../scala/kafka/tools/EndToEndLatency.scala     |  92 ++++++
 .../scala/kafka/tools/ProducerPerformance.scala |   4 +-
 .../scala/kafka/tools/TestEndToEndLatency.scala |  91 ------
 settings.gradle                                 |   3 +-
 tests/.gitignore                                |  11 +
 tests/README.md                                 | 150 +++++++++
 tests/kafkatest/__init__.py                     |  16 +
 tests/kafkatest/services/__init__.py            |  15 +
 tests/kafkatest/services/console_consumer.py    | 146 +++++++++
 tests/kafkatest/services/kafka.py               | 227 ++++++++++++++
 tests/kafkatest/services/performance.py         | 163 ++++++++++
 .../templates/console_consumer.properties       |  19 ++
 .../services/templates/kafka.properties         | 121 ++++++++
 .../services/templates/zookeeper.properties     |  25 ++
 tests/kafkatest/services/verifiable_producer.py | 107 +++++++
 tests/kafkatest/services/zookeeper.py           |  64 ++++
 tests/kafkatest/tests/__init__.py               |  15 +
 tests/kafkatest/tests/benchmark_test.py         | 274 +++++++++++++++++
 tests/kafkatest/tests/kafka_test.py             |  45 +++
 tests/kafkatest/tests/replication_test.py       | 165 ++++++++++
 tests/setup.py                                  |  27 ++
 .../clients/tools/ProducerPerformance.java      | 201 ++++++++++++
 .../clients/tools/ThroughputThrottler.java      | 118 +++++++
 .../kafka/clients/tools/VerifiableProducer.java | 307 +++++++++++++++++++
 vagrant/aws/aws-access-keys-commands            |  29 ++
 vagrant/aws/aws-example-Vagrantfile.local       |  28 ++
 vagrant/aws/aws-init.sh                         |  73 +++++
 vagrant/base.sh                                 |   9 +
 vagrant/system-test-Vagrantfile.local           |  22 ++
 36 files changed, 2603 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 1f3ba7d..4aae6e7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,3 +29,8 @@ config/server-*
 config/zookeeper-*
 core/data/*
 gradle/wrapper/*
+
+results
+tests/results
+.ducktape
+tests/.ducktape

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/Vagrantfile
----------------------------------------------------------------------
diff --git a/Vagrantfile b/Vagrantfile
index 1d7cc01..ee8b352 100644
--- a/Vagrantfile
+++ b/Vagrantfile
@@ -31,6 +31,7 @@ ram_megabytes = 1280
 # EC2
 ec2_access_key = ENV['AWS_ACCESS_KEY']
 ec2_secret_key = ENV['AWS_SECRET_KEY']
+ec2_session_token = ENV['AWS_SESSION_TOKEN']
 ec2_keypair_name = nil
 ec2_keypair_file = nil
 
@@ -50,6 +51,24 @@ if File.exists?(local_config_file) then
   eval(File.read(local_config_file), binding, "Vagrantfile.local")
 end
 
+# This is a horrible hack to work around bad interactions between
+# vagrant-hostmanager and vagrant-aws/vagrant's implementation. Hostmanager
+# wants to update the /etc/hosts entries, but tries to do so even on nodes that
+# aren't up (e.g. even when all nodes are stopped and you run vagrant
+# destroy). Because of the way the underlying code in vagrant works, it still
+# tries to communicate with the node and has to wait for a very long
+# timeout. This modifies the update to check for hosts that are not created or
+# stopped, skipping the update in that case since it's impossible to update
+# nodes in that state.
+Object.const_get("VagrantPlugins").const_get("HostManager").const_get("HostsFile").class_eval do
+  alias_method :old_update_guest, :update_guest
+  def update_guest(machine)
+    state_id = machine.state.id
+    return if state_id == :not_created || state_id == :stopped
+    old_update_guest(machine)
+  end
+end
+
 # TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered.
 Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
   config.hostmanager.enabled = true
@@ -85,13 +104,31 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
     override.vm.box = "dummy"
     override.vm.box_url = "https://github.com/mitchellh/vagrant-aws/raw/master/dummy.box"
 
-    override.hostmanager.ignore_private_ip = true
+    cached_addresses = {}
+    # Use a custom resolver that SSH's into the machine and finds the IP address
+    # directly. This lets us get at the private IP address directly, avoiding
+    # some issues with using the default IP resolver, which uses the public IP
+    # address.
+    override.hostmanager.ip_resolver = proc do |vm, resolving_vm|
+      if !cached_addresses.has_key?(vm.name)
+        state_id = vm.state.id
+        if state_id != :not_created && state_id != :stopped && vm.communicate.ready?
+          vm.communicate.execute("/sbin/ifconfig eth0 | grep 'inet addr' | tail -n 1 | egrep -o '[0-9\.]+' | head -n 1 2>&1") do |type, contents|
+            cached_addresses[vm.name] = contents.split("\n").first[/(\d+\.\d+\.\d+\.\d+)/, 1]
+          end
+        else
+          cached_addresses[vm.name] = nil
+        end
+      end
+      cached_addresses[vm.name]
+    end
 
     override.ssh.username = ec2_user
     override.ssh.private_key_path = ec2_keypair_file
 
     aws.access_key_id = ec2_access_key
     aws.secret_access_key = ec2_secret_key
+    aws.session_token = ec2_session_token
     aws.keypair_name = ec2_keypair_name
 
     aws.region = ec2_region

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 8c3fa28..ebe7409 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -65,6 +65,16 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
+for file in $base_dir/tools/build/libs/kafka-tools*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
 # classpath addition for release
 for file in $base_dir/libs/*.jar;
 do

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/bin/kafka-verifiable-producer.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-verifiable-producer.sh b/bin/kafka-verifiable-producer.sh
new file mode 100755
index 0000000..d0aa6c5
--- /dev/null
+++ b/bin/kafka-verifiable-producer.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+    export KAFKA_HEAP_OPTS="-Xmx512M"
+fi
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableProducer $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 9b6eb51..1b67e62 100644
--- a/build.gradle
+++ b/build.gradle
@@ -204,20 +204,20 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
   }
 }
 
-tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar']) {
+tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
 }
 
-tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar']) { }
+tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }
 
-tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar']) { }
+tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }
 
-tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test']) {
+tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) {
 }
 
 tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
 }
 
-tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives']) {
+tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
 }
 
 project(':core') {
@@ -413,6 +413,56 @@ project(':clients') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
+project(':tools') {
+    apply plugin: 'checkstyle'
+    archivesBaseName = "kafka-tools"
+
+    dependencies {
+        compile project(':clients')
+        compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
+        compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
+        compile "$slf4jlog4j"
+
+        testCompile 'junit:junit:4.6'
+        testCompile project(path: ':clients', configuration: 'archives')
+    }
+
+    task testJar(type: Jar) {
+        classifier = 'test'
+        from sourceSets.test.output
+    }
+
+    test {
+        testLogging {
+            events "passed", "skipped", "failed"
+            exceptionFormat = 'full'
+        }
+    }
+
+    javadoc {
+        include "**/org/apache/kafka/tools/*"
+    }
+
+    tasks.create(name: "copyDependantLibs", type: Copy) {
+        from (configurations.testRuntime) {
+            include('slf4j-log4j12*')
+        }
+        from (configurations.runtime) {
+            exclude('kafka-clients*')
+        }
+        into "$buildDir/dependant-libs-${scalaVersion}"
+    }
+
+    jar {
+        dependsOn 'copyDependantLibs'
+    }
+
+    checkstyle {
+        configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    }
+    test.dependsOn('checkstyleMain', 'checkstyleTest')
+}
+
 project(':log4j-appender') {
   apply plugin: 'checkstyle'
   archivesBaseName = "kafka-log4j-appender"

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 19e0659..18be1bb 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -92,6 +92,8 @@
 		<subpackage name="tools">
 			<allow pkg="org.apache.kafka.clients.producer" />
 			<allow pkg="org.apache.kafka.clients.consumer" />
+            <allow pkg="com.fasterxml.jackson" />
+            <allow pkg="net.sourceforge.argparse4j" />
 		</subpackage>
 	</subpackage>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
deleted file mode 100644
index 13f4d59..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.tools;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.*;
-
-public class ProducerPerformance {
-
-    private static final long NS_PER_MS = 1000000L;
-    private static final long NS_PER_SEC = 1000 * NS_PER_MS;
-    private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
-
-    public static void main(String[] args) throws Exception {
-        if (args.length < 4) {
-            System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
-                               " topic_name num_records record_size target_records_sec [prop_name=prop_value]*");
-            System.exit(1);
-        }
-
-        /* parse args */
-        String topicName = args[0];
-        long numRecords = Long.parseLong(args[1]);
-        int recordSize = Integer.parseInt(args[2]);
-        int throughput = Integer.parseInt(args[3]);
-
-        Properties props = new Properties();
-        for (int i = 4; i < args.length; i++) {
-            String[] pieces = args[i].split("=");
-            if (pieces.length != 2)
-                throw new IllegalArgumentException("Invalid property: " + args[i]);
-            props.put(pieces[0], pieces[1]);
-        }
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
-
-        /* setup perf test */
-        byte[] payload = new byte[recordSize];
-        Arrays.fill(payload, (byte) 1);
-        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
-        long sleepTime = NS_PER_SEC / throughput;
-        long sleepDeficitNs = 0;
-        Stats stats = new Stats(numRecords, 5000);
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < numRecords; i++) {
-            long sendStart = System.currentTimeMillis();
-            Callback cb = stats.nextCompletion(sendStart, payload.length, stats);
-            producer.send(record, cb);
-
-            /*
-             * Maybe sleep a little to control throughput. Sleep time can be a bit inaccurate for times < 1 ms so
-             * instead of sleeping each time instead wait until a minimum sleep time accumulates (the "sleep deficit")
-             * and then make up the whole deficit in one longer sleep.
-             */
-            if (throughput > 0) {
-                float elapsed = (sendStart - start) / 1000.f;
-                if (elapsed > 0 && i / elapsed > throughput) {
-                    sleepDeficitNs += sleepTime;
-                    if (sleepDeficitNs >= MIN_SLEEP_NS) {
-                        long sleepMs = sleepDeficitNs / 1000000;
-                        long sleepNs = sleepDeficitNs - sleepMs * 1000000;
-                        Thread.sleep(sleepMs, (int) sleepNs);
-                        sleepDeficitNs = 0;
-                    }
-                }
-            }
-        }
-
-        /* print final results */
-        producer.close();
-        stats.printTotal();
-    }
-
-    private static class Stats {
-        private long start;
-        private long windowStart;
-        private int[] latencies;
-        private int sampling;
-        private int iteration;
-        private int index;
-        private long count;
-        private long bytes;
-        private int maxLatency;
-        private long totalLatency;
-        private long windowCount;
-        private int windowMaxLatency;
-        private long windowTotalLatency;
-        private long windowBytes;
-        private long reportingInterval;
-
-        public Stats(long numRecords, int reportingInterval) {
-            this.start = System.currentTimeMillis();
-            this.windowStart = System.currentTimeMillis();
-            this.index = 0;
-            this.iteration = 0;
-            this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
-            this.latencies = new int[(int) (numRecords / this.sampling) + 1];
-            this.index = 0;
-            this.maxLatency = 0;
-            this.totalLatency = 0;
-            this.windowCount = 0;
-            this.windowMaxLatency = 0;
-            this.windowTotalLatency = 0;
-            this.windowBytes = 0;
-            this.totalLatency = 0;
-            this.reportingInterval = reportingInterval;
-        }
-
-        public void record(int iter, int latency, int bytes, long time) {
-            this.count++;
-            this.bytes += bytes;
-            this.totalLatency += latency;
-            this.maxLatency = Math.max(this.maxLatency, latency);
-            this.windowCount++;
-            this.windowBytes += bytes;
-            this.windowTotalLatency += latency;
-            this.windowMaxLatency = Math.max(windowMaxLatency, latency);
-            if (iter % this.sampling == 0) {
-                this.latencies[index] = latency;
-                this.index++;
-            }
-            /* maybe report the recent perf */
-            if (time - windowStart >= reportingInterval) {
-                printWindow();
-                newWindow();
-            }
-        }
-
-        public Callback nextCompletion(long start, int bytes, Stats stats) {
-            Callback cb = new PerfCallback(this.iteration, start, bytes, stats);
-            this.iteration++;
-            return cb;
-        }
-
-        public void printWindow() {
-            long ellapsed = System.currentTimeMillis() - windowStart;
-            double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
-            double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);
-            System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n",
-                              windowCount,
-                              recsPerSec,
-                              mbPerSec,
-                              windowTotalLatency / (double) windowCount,
-                              (double) windowMaxLatency);
-        }
-
-        public void newWindow() {
-            this.windowStart = System.currentTimeMillis();
-            this.windowCount = 0;
-            this.windowMaxLatency = 0;
-            this.windowTotalLatency = 0;
-            this.windowBytes = 0;
-        }
-
-        public void printTotal() {
-            long ellapsed = System.currentTimeMillis() - start;
-            double recsPerSec = 1000.0 * count / (double) ellapsed;
-            double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0);
-            int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
-            System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
-                              count,
-                              recsPerSec,
-                              mbPerSec,
-                              totalLatency / (double) count,
-                              (double) maxLatency,
-                              percs[0],
-                              percs[1],
-                              percs[2],
-                              percs[3]);
-        }
-
-        private static int[] percentiles(int[] latencies, int count, double... percentiles) {
-            int size = Math.min(count, latencies.length);
-            Arrays.sort(latencies, 0, size);
-            int[] values = new int[percentiles.length];
-            for (int i = 0; i < percentiles.length; i++) {
-                int index = (int) (percentiles[i] * size);
-                values[i] = latencies[index];
-            }
-            return values;
-        }
-    }
-
-    private static final class PerfCallback implements Callback {
-        private final long start;
-        private final int iteration;
-        private final int bytes;
-        private final Stats stats;
-
-        public PerfCallback(int iter, long start, int bytes, Stats stats) {
-            this.start = start;
-            this.stats = stats;
-            this.iteration = iter;
-            this.bytes = bytes;
-        }
-
-        public void onCompletion(RecordMetadata metadata, Exception exception) {
-            long now = System.currentTimeMillis();
-            int latency = (int) (now - start);
-            this.stats.record(iteration, latency, bytes, now);
-            if (exception != null)
-                exception.printStackTrace();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
new file mode 100755
index 0000000..7bb69b7
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.util.{Arrays, Properties}
+
+import kafka.consumer._
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+
+import scala.Option.option2Iterable
+
+object EndToEndLatency {
+  def main(args: Array[String]) {
+    if (args.length != 6) {
+      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")
+      System.exit(1)
+    }
+
+    val brokerList = args(0)
+    val zkConnect = args(1)
+    val topic = args(2)
+    val numMessages = args(3).toInt
+    val consumerFetchMaxWait = args(4).toInt
+    val producerAcks = args(5).toInt
+
+    val consumerProps = new Properties()
+    consumerProps.put("group.id", topic)
+    consumerProps.put("auto.commit.enable", "false")
+    consumerProps.put("auto.offset.reset", "largest")
+    consumerProps.put("zookeeper.connect", zkConnect)
+    consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString)
+    consumerProps.put("socket.timeout.ms", 1201000.toString)
+
+    val config = new ConsumerConfig(consumerProps)
+    val connector = Consumer.create(config)
+    val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
+    val iter = stream.iterator
+
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
+    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
+    producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+
+    // make sure the consumer fetcher has started before sending data since otherwise
+    // the consumption from the tail will skip the first message and hence be blocked
+    Thread.sleep(5000)
+
+    val message = "hello there beautiful".getBytes
+    var totalTime = 0.0
+    val latencies = new Array[Long](numMessages)
+    for (i <- 0 until numMessages) {
+      val begin = System.nanoTime
+      producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message))
+      val received = iter.next
+      val elapsed = System.nanoTime - begin
+      // poor man's progress bar
+      if (i % 1000 == 0)
+        println(i + "\t" + elapsed / 1000.0 / 1000.0)
+      totalTime += elapsed
+      latencies(i) = (elapsed / 1000 / 1000)
+    }
+    println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
+    Arrays.sort(latencies)
+    val p50 = latencies((latencies.length * 0.5).toInt)
+    val p99 = latencies((latencies.length * 0.99).toInt) 
+    val p999 = latencies((latencies.length * 0.999).toInt)
+    println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
+    producer.close()
+    connector.commitOffsets(true)
+    connector.shutdown()
+    System.exit(0)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 0ebfa59..0335cc6 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -115,9 +115,9 @@ object ProducerPerformance extends Logging {
       .defaultsTo(0)
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
-      "set, the csv metrics will be outputed here")
+      "set, the csv metrics will be output here")
       .withRequiredArg
-      .describedAs("metrics dictory")
+      .describedAs("metrics directory")
       .ofType(classOf[java.lang.String])
     val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/core/src/test/scala/kafka/tools/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/test/scala/kafka/tools/TestEndToEndLatency.scala
deleted file mode 100755
index 99b77a1..0000000
--- a/core/src/test/scala/kafka/tools/TestEndToEndLatency.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
-import kafka.consumer._
-import java.util.Properties
-import java.util.Arrays
-import scala.Option.option2Iterable
-
-object TestEndToEndLatency {
-  def main(args: Array[String]) {
-    if (args.length != 6) {
-      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")
-      System.exit(1)
-    }
-
-    val brokerList = args(0)
-    val zkConnect = args(1)
-    val topic = args(2)
-    val numMessages = args(3).toInt
-    val consumerFetchMaxWait = args(4).toInt
-    val producerAcks = args(5).toInt
-
-    val consumerProps = new Properties()
-    consumerProps.put("group.id", topic)
-    consumerProps.put("auto.commit.enable", "false")
-    consumerProps.put("auto.offset.reset", "largest")
-    consumerProps.put("zookeeper.connect", zkConnect)
-    consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString)
-    consumerProps.put("socket.timeout.ms", 1201000.toString)
-
-    val config = new ConsumerConfig(consumerProps)
-    val connector = Consumer.create(config)
-    val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
-    val iter = stream.iterator
-
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
-    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
-    producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
-    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-    val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
-
-    // make sure the consumer fetcher has started before sending data since otherwise
-    // the consumption from the tail will skip the first message and hence be blocked
-    Thread.sleep(5000)
-
-    val message = "hello there beautiful".getBytes
-    var totalTime = 0.0
-    val latencies = new Array[Long](numMessages)
-    for (i <- 0 until numMessages) {
-      val begin = System.nanoTime
-      producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message))
-      val received = iter.next
-      val elapsed = System.nanoTime - begin
-      // poor man's progress bar
-      if (i % 1000 == 0)
-        println(i + "\t" + elapsed / 1000.0 / 1000.0)
-      totalTime += elapsed
-      latencies(i) = (elapsed / 1000 / 1000)
-    }
-    println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
-    Arrays.sort(latencies)
-    val p50 = latencies((latencies.length * 0.5).toInt)
-    val p99 = latencies((latencies.length * 0.99).toInt) 
-    val p999 = latencies((latencies.length * 0.999).toInt)
-    println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
-    producer.close()
-    connector.commitOffsets(true)
-    connector.shutdown()
-    System.exit(0)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 3b6a952..1944917 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,4 +14,5 @@
 // limitations under the License.
 
 apply from: file('scala.gradle')
-include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender'
+include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender'
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/.gitignore
----------------------------------------------------------------------
diff --git a/tests/.gitignore b/tests/.gitignore
new file mode 100644
index 0000000..b218b83
--- /dev/null
+++ b/tests/.gitignore
@@ -0,0 +1,11 @@
+Vagrantfile.local
+
+.idea/
+
+*.pyc
+*.ipynb
+
+.DS_Store
+
+.ducktape
+results/

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/README.md
----------------------------------------------------------------------
diff --git a/tests/README.md b/tests/README.md
new file mode 100644
index 0000000..ffbc0d5
--- /dev/null
+++ b/tests/README.md
@@ -0,0 +1,150 @@
+System Integration & Performance Testing
+========================================
+
+This directory contains Kafka system integration and performance tests. 
+[ducktape](https://github.com/confluentinc/ducktape) is used to run the tests.  
+(ducktape is a distributed testing framework which provides test runner, 
+result reporter and utilities to pull up and tear down services.) 
+
+Local Quickstart
+----------------
+This quickstart will help you run the Kafka system tests on your local machine.
+
+* Install Virtual Box from [https://www.virtualbox.org/](https://www.virtualbox.org/) (run `$ vboxmanage --version` to check if it's installed).
+* Install Vagrant >= 1.6.4 from [http://www.vagrantup.com/](http://www.vagrantup.com/) (run `vagrant --version` to check if it's installed).
+* Install Vagrant Plugins:
+
+        # Required
+        $ vagrant plugin install vagrant-hostmanager vagrant-cachier
+
+* Build a specific branch of Kafka
+       
+        $ cd kafka
+        $ git checkout $BRANCH
+        $ gradle
+        $ ./gradlew jar
+      
+* Setup a testing cluster with Vagrant. Configure your Vagrant setup by creating the file 
+   `Vagrantfile.local` in the directory of your Kafka checkout. For testing purposes,
+  `num_brokers` and `num_kafka` should be 0, and `num_workers` should be set high enough
+  to run all of you tests. An example resides in kafka/vagrant/system-test-Vagrantfile.local
+
+        # Example Vagrantfile.local for use on local machine
+        # Vagrantfile.local should reside in the base Kafka directory
+        num_zookeepers = 0
+        num_kafka = 0
+        num_workers = 9
+
+* Bring up the cluster (note that the initial provisioning process can be slow since it involves
+installing dependencies and updates on every vm.):
+
+        $ vagrant up
+
+* Install ducktape:
+       
+        $ pip install ducktape
+
+* Run the system tests using ducktape:
+
+        $ cd tests
+        $ ducktape kafkatest/tests
+
+* If you make changes to your Kafka checkout, you'll need to rebuild and resync to your Vagrant cluster:
+
+        $ cd kafka
+        $ ./gradlew jar
+        $ vagrant rsync # Re-syncs build output to cluster
+        
+EC2 Quickstart
+--------------
+This quickstart will help you run the Kafka system tests on EC2. In this setup, all logic is run
+on EC2 and none on your local machine. 
+
+There are a lot of steps here, but the basic goals are to create one distinguished EC2 instance that
+will be our "test driver", and to set up the security groups and iam role so that the test driver
+can create, destroy, and run ssh commands on any number of "workers".
+
+As a convention, we'll use "kafkatest" in most names, but you can use whatever name you want. 
+
+Preparation
+-----------
+In these steps, we will create an IAM role which has permission to create and destroy EC2 instances, 
+set up a keypair used for ssh access to the test driver and worker machines, and create a security group to allow the test driver and workers to all communicate via TCP.
+
+* [Create an IAM role](http://docs.aws.amazon.com/IAM/latest/UserGuide/Using_SettingUpUser.html#Using_CreateUser_console). We'll give this role the ability to launch or kill additional EC2 machines.
+ - Create role "kafkatest-master"
+ - Role type: Amazon EC2
+ - Attach policy: AmazonEC2FullAccess (this will allow our test-driver to create and destroy EC2 instances)
+ 
+* If you haven't already, [set up a keypair to use for SSH access](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html). For the purpose
+of this quickstart, let's say the keypair name is kafkatest, and you've saved the private key in kafktest.pem
+
+* Next, create a security group called "kafkatest". 
+ - After creating the group, inbound rules: allow SSH on port 22 from anywhere; also, allow access on all ports (0-65535) from other machines in the kafkatest group.
+
+Create the Test Driver
+----------------------
+* Launch a new test driver machine 
+ - OS: Ubuntu server is recommended
+ - Instance type: t2.medium is easily enough since this machine is just a driver
+ - Instance details: Most defaults are fine.
+ - IAM role -> kafkatest-master
+ - Tagging the instance with a useful name is recommended. 
+ - Security group -> 'kafkatest'
+  
+
+* Once the machine is started, upload the SSH key to your test driver:
+
+        $ scp -i /path/to/kafkatest.pem \
+            /path/to/kafkatest.pem ubuntu@public.hostname.amazonaws.com:kafkatest.pem
+
+* Grab the public hostname/IP (available for example by navigating to your EC2 dashboard and viewing running instances) of your test driver and SSH into it:
+
+        $ ssh -i /path/to/kafkatest.pem ubuntu@public.hostname.amazonaws.com
+        
+Set Up the Test Driver
+----------------------
+The following steps assume you have ssh'd into
+the test driver machine.
+
+* Start by making sure you're up to date, and install git and ducktape:
+
+        $ sudo apt-get update && sudo apt-get -y upgrade && sudo apt-get install -y git
+        $ pip install ducktape
+
+* Get Kafka:
+
+        $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
+        
+* Install some dependencies:
+
+        $ cd kafka
+        $ kafka/vagrant/aws/aws-init.sh
+        $ . ~/.bashrc
+
+* An example Vagrantfile.local has been created by aws-init.sh which looks something like:
+
+        # Vagrantfile.local
+        ec2_instance_type = "..." # Pick something appropriate for your
+                                  # test. Note that the default m3.medium has
+                                  # a small disk.
+        num_zookeepers = 0
+        num_kafka = 0
+        num_workers = 9
+        ec2_keypair_name = 'kafkatest'
+        ec2_keypair_file = '/home/ubuntu/kafkatest.pem'
+        ec2_security_groups = ['kafkatest']
+        ec2_region = 'us-west-2'
+        ec2_ami = "ami-29ebb519"
+
+* Start up the instances (note we have found bringing up machines in parallel can cause errors on aws):
+
+        $ vagrant up --provider=aws --no-provision --no-parallel && vagrant provision
+
+* Now you should be able to run tests:
+
+        $ cd kafka/tests
+        $ ducktape kafkatest/tests
+
+* To halt your workers without destroying persistent state, run `vagrant halt`. Run `vagrant destroy -f` to destroy all traces of your workers.
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
new file mode 100644
index 0000000..28d269b
--- /dev/null
+++ b/tests/kafkatest/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/__init__.py b/tests/kafkatest/services/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/services/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
new file mode 100644
index 0000000..33ef4ea
--- /dev/null
+++ b/tests/kafkatest/services/console_consumer.py
@@ -0,0 +1,146 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+
+def is_int(msg):
+    """Default method used to check whether text pulled from console consumer is a message.
+
+    return int or None
+    """
+    try:
+        return int(msg)
+    except:
+        return None
+
+
+"""
+0.8.2.1 ConsoleConsumer options
+
+The console consumer is a tool that reads data from Kafka and outputs it to standard output.
+Option                                  Description
+------                                  -----------
+--blacklist <blacklist>                 Blacklist of topics to exclude from
+                                          consumption.
+--consumer.config <config file>         Consumer config properties file.
+--csv-reporter-enabled                  If set, the CSV metrics reporter will
+                                          be enabled
+--delete-consumer-offsets               If specified, the consumer path in
+                                          zookeeper is deleted when starting up
+--formatter <class>                     The name of a class to use for
+                                          formatting kafka messages for
+                                          display. (default: kafka.tools.
+                                          DefaultMessageFormatter)
+--from-beginning                        If the consumer does not already have
+                                          an established offset to consume
+                                          from, start with the earliest
+                                          message present in the log rather
+                                          than the latest message.
+--max-messages <Integer: num_messages>  The maximum number of messages to
+                                          consume before exiting. If not set,
+                                          consumption is continual.
+--metrics-dir <metrics dictory>         If csv-reporter-enable is set, and
+                                          this parameter isset, the csv
+                                          metrics will be outputed here
+--property <prop>
+--skip-message-on-error                 If there is an error when processing a
+                                          message, skip it instead of halt.
+--topic <topic>                         The topic id to consume on.
+--whitelist <whitelist>                 Whitelist of topics to include for
+                                          consumption.
+--zookeeper <urls>                      REQUIRED: The connection string for
+                                          the zookeeper connection in the form
+                                          host:port. Multiple URLS can be
+                                          given to allow fail-over.
+"""
+
+
+class ConsoleConsumer(BackgroundThreadService):
+    logs = {
+        "consumer_log": {
+            "path": "/mnt/consumer.log",
+            "collect_default": True}
+        }
+
+    def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None):
+        """
+        Args:
+            context:                    standard context
+            num_nodes:                  number of nodes to use (this should be 1)
+            kafka:                      kafka service
+            topic:                      consume from this topic
+            message_validator:          function which returns message or None
+            from_beginning:             consume from beginning if True, else from the end
+            consumer_timeout_ms:        corresponds to consumer.timeout.ms. consumer process ends if time between
+                                        successively consumed messages exceeds this timeout. Setting this and
+                                        waiting for the consumer to stop is a pretty good way to consume all messages
+                                        in a topic.
+        """
+        super(ConsoleConsumer, self).__init__(context, num_nodes)
+        self.kafka = kafka
+        self.args = {
+            'topic': topic,
+        }
+
+        self.consumer_timeout_ms = consumer_timeout_ms
+
+        self.from_beginning = from_beginning
+        self.message_validator = message_validator
+        self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
+
+    @property
+    def start_cmd(self):
+        args = self.args.copy()
+        args.update({'zk_connect': self.kafka.zk.connect_setting()})
+        cmd = "/opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
+              " --consumer.config /mnt/console_consumer.properties" % args
+
+        if self.from_beginning:
+            cmd += " --from-beginning"
+
+        cmd += " 2>> /mnt/consumer.log | tee -a /mnt/consumer.log &"
+        return cmd
+
+    def _worker(self, idx, node):
+        # form config file
+        if self.consumer_timeout_ms is not None:
+            prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
+        else:
+            prop_file = self.render('console_consumer.properties')
+
+        self.logger.info("console_consumer.properties:")
+        self.logger.info(prop_file)
+        node.account.create_file("/mnt/console_consumer.properties", prop_file)
+
+        # Run and capture output
+        cmd = self.start_cmd
+        self.logger.debug("Console consumer %d command: %s", idx, cmd)
+        for line in node.account.ssh_capture(cmd):
+            msg = line.strip()
+            msg = self.message_validator(msg)
+            if msg is not None:
+                self.logger.debug("consumed a message: " + str(msg))
+                self.messages_consumed[idx].append(msg)
+
+    def start_node(self, node):
+        super(ConsoleConsumer, self).start_node(node)
+
+    def stop_node(self, node):
+        node.account.kill_process("java", allow_fail=False)
+
+    def clean_node(self, node):
+        node.account.ssh("rm -rf /mnt/console_consumer.properties /mnt/consumer.log", allow_fail=False)
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py
new file mode 100644
index 0000000..34ec5ef
--- /dev/null
+++ b/tests/kafkatest/services/kafka.py
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+
+import json
+import re
+import signal
+import time
+
+
+class KafkaService(Service):
+
+    logs = {
+        "kafka_log": {
+            "path": "/mnt/kafka.log",
+            "collect_default": True},
+        "kafka_data": {
+            "path": "/mnt/kafka-logs",
+            "collect_default": False}
+    }
+
+    def __init__(self, context, num_nodes, zk, topics=None):
+        """
+        :type context
+        :type zk: ZookeeperService
+        :type topics: dict
+        """
+        super(KafkaService, self).__init__(context, num_nodes)
+        self.zk = zk
+        self.topics = topics
+
+    def start(self):
+        super(KafkaService, self).start()
+
+        # Create topics if necessary
+        if self.topics is not None:
+            for topic, topic_cfg in self.topics.items():
+                if topic_cfg is None:
+                    topic_cfg = {}
+
+                topic_cfg["topic"] = topic
+                self.create_topic(topic_cfg)
+
+    def start_node(self, node):
+        props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node))
+        self.logger.info("kafka.properties:")
+        self.logger.info(props_file)
+        node.account.create_file("/mnt/kafka.properties", props_file)
+
+        cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid"
+        self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
+        node.account.ssh(cmd)
+        time.sleep(5)
+        if len(self.pids(node)) == 0:
+            raise Exception("No process ids recorded on node %s" % str(node))
+
+    def pids(self, node):
+        """Return process ids associated with running processes on the given node."""
+        try:
+            return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)]
+        except:
+            return []
+
+    def signal_node(self, node, sig=signal.SIGTERM):
+        pids = self.pids(node)
+        for pid in pids:
+            node.account.signal(pid, sig)
+
+    def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
+        leader = self.leader(topic, partition)
+        self.signal_node(leader, sig)
+
+    def stop_node(self, node, clean_shutdown=True):
+        pids = self.pids(node)
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+        for pid in pids:
+            node.account.signal(pid, sig, allow_fail=False)
+
+        node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
+
+    def clean_node(self, node):
+        node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
+
+    def create_topic(self, topic_cfg):
+        node = self.nodes[0] # any node is fine here
+        self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
+
+        cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\
+            "--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
+                'zk_connect': self.zk.connect_setting(),
+                'topic': topic_cfg.get("topic"),
+                'partitions': topic_cfg.get('partitions', 1),
+                'replication': topic_cfg.get('replication-factor', 1)
+            }
+
+        if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None:
+            for config_name, config_value in topic_cfg["configs"].items():
+                cmd += " --config %s=%s" % (config_name, str(config_value))
+
+        self.logger.info("Running topic creation command...\n%s" % cmd)
+        node.account.ssh(cmd)
+
+        time.sleep(1)
+        self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
+        for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
+            self.logger.info(line)
+
+    def describe_topic(self, topic):
+        node = self.nodes[0]
+        cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
+              (self.zk.connect_setting(), topic)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+        return output
+
+    def verify_reassign_partitions(self, reassignment):
+        """Run the reassign partitions admin tool in "verify" mode
+        """
+        node = self.nodes[0]
+        json_file = "/tmp/" + str(time.time()) + "_reassign.json"
+
+        # reassignment to json
+        json_str = json.dumps(reassignment)
+        json_str = json.dumps(json_str)
+
+        # create command
+        cmd = "echo %s > %s && " % (json_str, json_file)
+        cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
+                "--zookeeper %(zk_connect)s "\
+                "--reassignment-json-file %(reassignment_file)s "\
+                "--verify" % {'zk_connect': self.zk.connect_setting(),
+                                'reassignment_file': json_file}
+        cmd += " && sleep 1 && rm -f %s" % json_file
+
+        # send command
+        self.logger.info("Verifying parition reassignment...")
+        self.logger.debug(cmd)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+
+        self.logger.debug(output)
+
+        if re.match(".*is in progress.*", output) is not None:
+            return False
+
+        return True
+
+    def execute_reassign_partitions(self, reassignment):
+        """Run the reassign partitions admin tool in "verify" mode
+        """
+        node = self.nodes[0]
+        json_file = "/tmp/" + str(time.time()) + "_reassign.json"
+
+        # reassignment to json
+        json_str = json.dumps(reassignment)
+        json_str = json.dumps(json_str)
+
+        # create command
+        cmd = "echo %s > %s && " % (json_str, json_file)
+        cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
+                "--zookeeper %(zk_connect)s "\
+                "--reassignment-json-file %(reassignment_file)s "\
+                "--execute" % {'zk_connect': self.zk.connect_setting(),
+                                'reassignment_file': json_file}
+        cmd += " && sleep 1 && rm -f %s" % json_file
+
+        # send command
+        self.logger.info("Executing parition reassignment...")
+        self.logger.debug(cmd)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+
+        self.logger.debug("Verify partition reassignment:")
+        self.logger.debug(output)
+
+    def restart_node(self, node, wait_sec=0, clean_shutdown=True):
+        """Restart the given node, waiting wait_sec in between stopping and starting up again."""
+        self.stop_node(node, clean_shutdown)
+        time.sleep(wait_sec)
+        self.start_node(node)
+
+    def leader(self, topic, partition=0):
+        """ Get the leader replica for the given topic and partition.
+        """
+        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \
+              % self.zk.connect_setting()
+        cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
+        self.logger.debug(cmd)
+
+        node = self.nodes[0]
+        self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
+        partition_state = None
+        for line in node.account.ssh_capture(cmd):
+            match = re.match("^({.+})$", line)
+            if match is not None:
+                partition_state = match.groups()[0]
+                break
+
+        if partition_state is None:
+            raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
+
+        partition_state = json.loads(partition_state)
+        self.logger.info(partition_state)
+
+        leader_idx = int(partition_state["leader"])
+        self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
+        return self.get_node(leader_idx)
+
+    def bootstrap_servers(self):
+        return ','.join([node.account.hostname + ":9092" for node in self.nodes])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance.py b/tests/kafkatest/services/performance.py
new file mode 100644
index 0000000..65c1a4d
--- /dev/null
+++ b/tests/kafkatest/services/performance.py
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+
+class PerformanceService(BackgroundThreadService):
+    def __init__(self, context, num_nodes):
+        super(PerformanceService, self).__init__(context, num_nodes)
+        self.results = [None] * self.num_nodes
+        self.stats = [[] for x in range(self.num_nodes)]
+
+
+class ProducerPerformanceService(PerformanceService):
+    def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
+        super(ProducerPerformanceService, self).__init__(context, num_nodes)
+        self.kafka = kafka
+        self.args = {
+            'topic': topic,
+            'num_records': num_records,
+            'record_size': record_size,
+            'throughput': throughput
+        }
+        self.settings = settings
+        self.intermediate_stats = intermediate_stats
+
+    def _worker(self, idx, node):
+        args = self.args.copy()
+        args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
+        cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
+              "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args
+
+        for key,value in self.settings.items():
+            cmd += " %s=%s" % (str(key), str(value))
+        self.logger.debug("Producer performance %d command: %s", idx, cmd)
+
+        def parse_stats(line):
+            parts = line.split(',')
+            return {
+                'records': int(parts[0].split()[0]),
+                'records_per_sec': float(parts[1].split()[0]),
+                'mbps': float(parts[1].split('(')[1].split()[0]),
+                'latency_avg_ms': float(parts[2].split()[0]),
+                'latency_max_ms': float(parts[3].split()[0]),
+                'latency_50th_ms': float(parts[4].split()[0]),
+                'latency_95th_ms': float(parts[5].split()[0]),
+                'latency_99th_ms': float(parts[6].split()[0]),
+                'latency_999th_ms': float(parts[7].split()[0]),
+            }
+        last = None
+        for line in node.account.ssh_capture(cmd):
+            self.logger.debug("Producer performance %d: %s", idx, line.strip())
+            if self.intermediate_stats:
+                try:
+                    self.stats[idx-1].append(parse_stats(line))
+                except:
+                    # Sometimes there are extraneous log messages
+                    pass
+            last = line
+        try:
+            self.results[idx-1] = parse_stats(last)
+        except:
+            self.logger.error("Bad last line: %s", last)
+
+
+class ConsumerPerformanceService(PerformanceService):
+    def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}):
+        super(ConsumerPerformanceService, self).__init__(context, num_nodes)
+        self.kafka = kafka
+        self.args = {
+            'topic': topic,
+            'num_records': num_records,
+            'throughput': throughput,
+            'threads': threads,
+        }
+        self.settings = settings
+
+    def _worker(self, idx, node):
+        args = self.args.copy()
+        args.update({'zk_connect': self.kafka.zk.connect_setting()})
+        cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\
+              "--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args
+        for key,value in self.settings.items():
+            cmd += " %s=%s" % (str(key), str(value))
+        self.logger.debug("Consumer performance %d command: %s", idx, cmd)
+        last = None
+        for line in node.account.ssh_capture(cmd):
+            self.logger.debug("Consumer performance %d: %s", idx, line.strip())
+            last = line
+        # Parse and save the last line's information
+        parts = last.split(',')
+
+        self.results[idx-1] = {
+            'total_mb': float(parts[2]),
+            'mbps': float(parts[3]),
+            'records_per_sec': float(parts[5]),
+        }
+
+
+class EndToEndLatencyService(PerformanceService):
+    def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
+        super(EndToEndLatencyService, self).__init__(context, num_nodes)
+        self.kafka = kafka
+        self.args = {
+            'topic': topic,
+            'num_records': num_records,
+            'consumer_fetch_max_wait': consumer_fetch_max_wait,
+            'acks': acks
+        }
+
+    def _worker(self, idx, node):
+        args = self.args.copy()
+        args.update({
+            'zk_connect': self.kafka.zk.connect_setting(),
+            'bootstrap_servers': self.kafka.bootstrap_servers(),
+        })
+        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
+              "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\
+              "%(consumer_fetch_max_wait)d %(acks)d" % args
+        self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
+        results = {}
+        for line in node.account.ssh_capture(cmd):
+            self.logger.debug("End-to-end latency %d: %s", idx, line.strip())
+            if line.startswith("Avg latency:"):
+                results['latency_avg_ms'] = float(line.split()[2])
+            if line.startswith("Percentiles"):
+                results['latency_50th_ms'] = float(line.split()[3][:-1])
+                results['latency_99th_ms'] = float(line.split()[6][:-1])
+                results['latency_999th_ms'] = float(line.split()[9])
+        self.results[idx-1] = results
+
+
+def parse_performance_output(summary):
+        parts = summary.split(',')
+        results = {
+            'records': int(parts[0].split()[0]),
+            'records_per_sec': float(parts[1].split()[0]),
+            'mbps': float(parts[1].split('(')[1].split()[0]),
+            'latency_avg_ms': float(parts[2].split()[0]),
+            'latency_max_ms': float(parts[3].split()[0]),
+            'latency_50th_ms': float(parts[4].split()[0]),
+            'latency_95th_ms': float(parts[5].split()[0]),
+            'latency_99th_ms': float(parts[6].split()[0]),
+            'latency_999th_ms': float(parts[7].split()[0]),
+        }
+        # To provide compatibility with ConsumerPerformanceService
+        results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec'])
+        results['rate_mbps'] = results['mbps']
+        results['rate_mps'] = results['records_per_sec']
+
+        return results

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/templates/console_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties
new file mode 100644
index 0000000..63782fc
--- /dev/null
+++ b/tests/kafkatest/services/templates/console_consumer.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+{% if consumer_timeout_ms is defined %}
+consumer.timeout.ms={{ consumer_timeout_ms }}
+{% endif %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties
new file mode 100644
index 0000000..db1077a
--- /dev/null
+++ b/tests/kafkatest/services/templates/kafka.properties
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id={{ broker_id }}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9092
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+#host.name=localhost
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+advertised.host.name={{ node.account.hostname }}
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=3
+ 
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=65536
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=/mnt/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk. 
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={{ zk.connect_setting() }}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=2000

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/templates/zookeeper.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/zookeeper.properties b/tests/kafkatest/services/templates/zookeeper.properties
new file mode 100644
index 0000000..e66c53f
--- /dev/null
+++ b/tests/kafkatest/services/templates/zookeeper.properties
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+dataDir=/mnt/zookeeper
+clientPort=2181
+maxClientCnxns=0
+initLimit=5
+syncLimit=2
+quorumListenOnAllIPs=true
+{% for node in nodes %}
+server.{{ loop.index }}={{ node.account.hostname }}:2888:3888
+{% endfor %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
new file mode 100644
index 0000000..cca8227
--- /dev/null
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+import json
+
+
+class VerifiableProducer(BackgroundThreadService):
+
+    logs = {
+        "producer_log": {
+            "path": "/mnt/producer.log",
+            "collect_default": False}
+    }
+
+    def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000):
+        super(VerifiableProducer, self).__init__(context, num_nodes)
+
+        self.kafka = kafka
+        self.topic = topic
+        self.max_messages = max_messages
+        self.throughput = throughput
+
+        self.acked_values = []
+        self.not_acked_values = []
+
+    def _worker(self, idx, node):
+        cmd = self.start_cmd
+        self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
+
+        for line in node.account.ssh_capture(cmd):
+            line = line.strip()
+
+            data = self.try_parse_json(line)
+            if data is not None:
+
+                with self.lock:
+                    if data["name"] == "producer_send_error":
+                        data["node"] = idx
+                        self.not_acked_values.append(int(data["value"]))
+
+                    elif data["name"] == "producer_send_success":
+                        self.acked_values.append(int(data["value"]))
+
+    @property
+    def start_cmd(self):
+        cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \
+              " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+        if self.max_messages > 0:
+            cmd += " --max-messages %s" % str(self.max_messages)
+        if self.throughput > 0:
+            cmd += " --throughput %s" % str(self.throughput)
+
+        cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &"
+        return cmd
+
+    @property
+    def acked(self):
+        with self.lock:
+            return self.acked_values
+
+    @property
+    def not_acked(self):
+        with self.lock:
+            return self.not_acked_values
+
+    @property
+    def num_acked(self):
+        with self.lock:
+            return len(self.acked_values)
+
+    @property
+    def num_not_acked(self):
+        with self.lock:
+            return len(self.not_acked_values)
+
+    def stop_node(self, node):
+        node.account.kill_process("VerifiableProducer", allow_fail=False)
+        # block until the corresponding thread exits
+        if len(self.worker_threads) >= self.idx(node):
+            # Need to guard this because stop is preemptively called before the worker threads are added and started
+            self.worker_threads[self.idx(node) - 1].join()
+
+    def clean_node(self, node):
+        node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
+
+    def try_parse_json(self, string):
+        """Try to parse a string as json. Return None if not parseable."""
+        try:
+            record = json.loads(string)
+            return record
+        except ValueError:
+            self.logger.debug("Could not parse as json: %s" % str(string))
+            return None

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
new file mode 100644
index 0000000..56f4606
--- /dev/null
+++ b/tests/kafkatest/services/zookeeper.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.services.service import Service
+
+import time
+
+
+class ZookeeperService(Service):
+
+    logs = {
+        "zk_log": {
+            "path": "/mnt/zk.log",
+            "collect_default": True}
+    }
+
+    def __init__(self, context, num_nodes):
+        """
+        :type context
+        """
+        super(ZookeeperService, self).__init__(context, num_nodes)
+
+    def start_node(self, node):
+        idx = self.idx(node)
+        self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname)
+
+        node.account.ssh("mkdir -p /mnt/zookeeper")
+        node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
+
+        config_file = self.render('zookeeper.properties')
+        self.logger.info("zookeeper.properties:")
+        self.logger.info(config_file)
+        node.account.create_file("/mnt/zookeeper.properties", config_file)
+
+        node.account.ssh(
+            "/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &"
+            % self.logs["zk_log"])
+
+        time.sleep(5)  # give it some time to start
+
+    def stop_node(self, node):
+        idx = self.idx(node)
+        self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
+        node.account.kill_process("zookeeper", allow_fail=False)
+
+    def clean_node(self, node):
+        self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
+        node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
+
+    def connect_setting(self):
+        return ','.join([node.account.hostname + ':2181' for node in self.nodes])

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/__init__.py b/tests/kafkatest/tests/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/tests/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults