You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/27 22:53:07 UTC

kafka git commit: KAFKA-2377: Add basic system test for copycat using source and sink file connectors.

Repository: kafka
Updated Branches:
  refs/heads/trunk 492bfdfa8 -> 2fa8c7485


KAFKA-2377: Add basic system test for copycat using source and sink file connectors.

Tests standalone mode by running separate source and sink connectors, catting
data into the source file, and validating the output in the sink file. Restarts
the service to verify that clean restarts will result in tasks resuming where
they left off.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Geoff Andreson, Gwen Shapira

Closes #150 from ewencp/kafka-2377-copycat-system-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fa8c748
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fa8c748
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fa8c748

Branch: refs/heads/trunk
Commit: 2fa8c748537a86a8b81b43ab7df060b6e61f5085
Parents: 492bfdf
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Aug 27 13:53:00 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Aug 27 13:53:00 2015 -0700

----------------------------------------------------------------------
 .../kafka/copycat/file/FileStreamSinkTask.java  |  6 +-
 .../copycat/file/FileStreamSourceTask.java      | 33 ++++---
 .../copycat/file/FileStreamSourceTaskTest.java  |  9 +-
 .../kafka/copycat/runtime/WorkerSinkTask.java   |  1 +
 .../sanity_checks/test_console_consumer.py      |  4 -
 tests/kafkatest/services/copycat.py             | 95 ++++++++++++++++++++
 tests/kafkatest/services/kafka.py               |  8 +-
 tests/kafkatest/tests/copycat_test.py           | 69 ++++++++++++++
 tests/kafkatest/tests/replication_test.py       |  3 -
 .../templates/copycat-file-sink.properties      | 20 +++++
 .../templates/copycat-file-source.properties    | 20 +++++
 .../templates/copycat-standalone.properties     | 25 ++++++
 tests/setup.py                                  |  2 +-
 13 files changed, 264 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 5ef657f..870a952 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -24,8 +24,8 @@ import org.apache.kafka.copycat.sink.SinkTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.Collection;
 import java.util.Map;
@@ -54,9 +54,9 @@ public class FileStreamSinkTask extends SinkTask {
             outputStream = System.out;
         } else {
             try {
-                outputStream = new PrintStream(new File(filename));
+                outputStream = new PrintStream(new FileOutputStream(filename, true));
             } catch (FileNotFoundException e) {
-                throw new CopycatException("Couldn't find or create file for FileStreamSinkTask: {}", e);
+                throw new CopycatException("Couldn't find or create file for FileStreamSinkTask", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index e411cad..a841386 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -51,11 +51,19 @@ public class FileStreamSourceTask extends SourceTask {
     @Override
     public void start(Properties props) {
         filename = props.getProperty(FileStreamSourceConnector.FILE_CONFIG);
-        if (filename == null) {
+        if (filename == null || filename.isEmpty()) {
             stream = System.in;
             // Tracking offset for stdin doesn't make sense
             streamOffset = null;
-        } else {
+        }
+        topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
+        if (topic == null)
+            throw new CopycatException("ConsoleSourceTask config missing topic setting");
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        if (stream == null) {
             try {
                 stream = new FileInputStream(filename);
                 SchemaAndValue offsetWithSchema = context.offsetStorageReader().offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, filename));
@@ -81,18 +89,16 @@ public class FileStreamSourceTask extends SourceTask {
                 } else {
                     streamOffset = 0L;
                 }
+                reader = new BufferedReader(new InputStreamReader(stream));
             } catch (FileNotFoundException e) {
-                throw new CopycatException("Couldn't find file for FileStreamSourceTask: {}", e);
+                log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created");
+                synchronized (this) {
+                    this.wait(1000);
+                }
+                return null;
             }
         }
-        topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
-        if (topic == null)
-            throw new CopycatException("ConsoleSourceTask config missing topic setting");
-        reader = new BufferedReader(new InputStreamReader(stream));
-    }
 
-    @Override
-    public List<SourceRecord> poll() throws InterruptedException {
         // Unfortunately we can't just use readLine() because it blocks in an uninterruptible way.
         // Instead we have to manage splitting lines ourselves, using simple backoff when no new data
         // is available.
@@ -132,7 +138,9 @@ public class FileStreamSourceTask extends SourceTask {
             }
 
             if (nread <= 0)
-                Thread.sleep(1);
+                synchronized (this) {
+                    this.wait(1000);
+                }
 
             return records;
         } catch (IOException e) {
@@ -182,8 +190,7 @@ public class FileStreamSourceTask extends SourceTask {
             } catch (IOException e) {
                 log.error("Failed to close ConsoleSourceTask stream: ", e);
             }
-            reader = null;
-            stream = null;
+            this.notify();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
index ee45493..ab89b6a 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
@@ -118,18 +118,19 @@ public class FileStreamSourceTaskTest {
     }
 
     @Test(expected = CopycatException.class)
-    public void testMissingTopic() {
-        expectOffsetLookupReturnNone();
+    public void testMissingTopic() throws InterruptedException {
         replay();
 
         config.remove(FileStreamSourceConnector.TOPIC_CONFIG);
         task.start(config);
     }
 
-    @Test(expected = CopycatException.class)
-    public void testInvalidFile() {
+    public void testInvalidFile() throws InterruptedException {
         config.setProperty(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
         task.start(config);
+        // Currently the task retries indefinitely if the file isn't found, but shouldn't return any data.
+        for (int i = 0; i < 100; i++)
+            assertEquals(null, task.poll());
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index a1d7fab..dfb1f96 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -120,6 +120,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
      * the write commit. This should only be invoked by the WorkerSinkTaskThread.
      **/
     public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
+        log.info("{} Committing offsets", this);
         HashMap<TopicPartition, Long> offsets = new HashMap<>();
         for (TopicPartition tp : consumer.assignment()) {
             offsets.put(tp, consumer.position(tp));

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 3e523e1..5b061aa 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -74,7 +74,3 @@ class ConsoleConsumerTest(Test):
         assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
 
         self.consumer.stop_node(node)
-
-
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/tests/kafkatest/services/copycat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py
new file mode 100644
index 0000000..7452b85
--- /dev/null
+++ b/tests/kafkatest/services/copycat.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+import subprocess, signal
+
+
+class CopycatStandaloneService(Service):
+    """Runs Copycat in standalone mode."""
+
+    logs = {
+        "kafka_log": {
+            "path": "/mnt/copycat.log",
+            "collect_default": True},
+    }
+
+    def __init__(self, context, kafka, files):
+        super(CopycatStandaloneService, self).__init__(context, 1)
+        self.kafka = kafka
+        self.files = files
+
+    def set_configs(self, config_template, connector_config_template):
+        """
+        Set configurations for the standalone worker and the connector to run on
+        it. These are not provided in the constructor because at least the
+        standalone worker config generally needs access to ZK/Kafka services to
+        create the configuration.
+        """
+        self.config_template = config_template
+        self.connector_config_template = connector_config_template
+
+    # For convenience since this service only makes sense with a single node
+    @property
+    def node(self):
+        return self.nodes[0]
+
+    def start(self):
+        super(CopycatStandaloneService, self).start()
+
+    def start_node(self, node):
+        node.account.create_file("/mnt/copycat.properties", self.config_template)
+        node.account.create_file("/mnt/copycat-connector.properties", self.connector_config_template)
+
+        self.logger.info("Starting Copycat standalone process")
+        with node.account.monitor_log("/mnt/copycat.log") as monitor:
+            node.account.ssh("/opt/kafka/bin/copycat-standalone.sh /mnt/copycat.properties /mnt/copycat-connector.properties " +
+                             "1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid")
+            monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup")
+
+        if len(self.pids()) == 0:
+            raise RuntimeError("No process ids recorded")
+
+    def pids(self):
+        """Return process ids for Copycat processes."""
+        try:
+            return [pid for pid in self.node.account.ssh_capture("cat /mnt/copycat.pid", callback=int)]
+        except:
+            return []
+
+    def stop_node(self, node, clean_shutdown=True):
+        pids = self.pids()
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+        for pid in pids:
+            self.node.account.signal(pid, sig, allow_fail=False)
+        for pid in pids:
+            wait_until(lambda: not self.node.account.alive(pid), timeout_sec=10, err_msg="Copycat standalone process took too long to exit")
+
+        node.account.ssh("rm -f /mnt/copycat.pid", allow_fail=False)
+
+    def restart(self):
+        # We don't want to do any clean up here, just restart the process
+        self.stop_node(self.node)
+        self.start_node(self.node)
+
+    def clean_node(self, node):
+        if len(self.pids()) > 0:
+            self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
+                             (self.__class__.__name__, node.account))
+        for pid in self.pids():
+            self.node.account.signal(pid, signal.SIGKILL, allow_fail=False)
+        node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log /mnt/copycat.properties /mnt/copycat-connector.properties " + " ".join(self.files), allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/tests/kafkatest/services/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py
index 76f9cf6..5ff8047 100644
--- a/tests/kafkatest/services/kafka.py
+++ b/tests/kafkatest/services/kafka.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
 
 import json
 import re
@@ -62,8 +63,9 @@ class KafkaService(Service):
 
         cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid"
         self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
-        node.account.ssh(cmd)
-        time.sleep(5)
+        with node.account.monitor_log("/mnt/kafka.log") as monitor:
+            node.account.ssh(cmd)
+            monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
         if len(self.pids(node)) == 0:
             raise Exception("No process ids recorded on node %s" % str(node))
 
@@ -225,4 +227,4 @@ class KafkaService(Service):
         return self.get_node(leader_idx)
 
     def bootstrap_servers(self):
-        return ','.join([node.account.hostname + ":9092" for node in self.nodes])
\ No newline at end of file
+        return ','.join([node.account.hostname + ":9092" for node in self.nodes])

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/tests/kafkatest/tests/copycat_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_test.py b/tests/kafkatest/tests/copycat_test.py
new file mode 100644
index 0000000..344f7ef
--- /dev/null
+++ b/tests/kafkatest/tests/copycat_test.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.copycat import CopycatStandaloneService
+from ducktape.utils.util import wait_until
+import hashlib, subprocess
+
+class CopycatStandaloneFileTest(KafkaTest):
+    """
+    Simple test of Copycat that produces data from a file in one Copycat
+    standalone process and consumes it on another, validating the output is
+    identical to the input.
+    """
+
+    INPUT_FILE = "/mnt/copycat.input"
+    OUTPUT_FILE = "/mnt/copycat.output"
+
+    OFFSETS_FILE = "/mnt/copycat.offsets"
+
+    FIRST_INPUT = "foo\nbar\nbaz\n"
+    SECOND_INPUT = "razz\nma\ntazz\n"
+
+    def __init__(self, test_context):
+        super(CopycatStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'test' : { 'partitions': 1, 'replication-factor': 1 }
+        })
+
+        self.source = CopycatStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
+        self.sink = CopycatStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+
+    def test_file_source_and_sink(self):
+        # These need to be set
+        self.source.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-source.properties"))
+        self.sink.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-sink.properties"))
+
+        self.source.start()
+        self.sink.start()
+
+        # Generating data on the source node should generate new records and create new output on the sink node
+        self.source.node.account.ssh("echo -e -n " + repr(self.FIRST_INPUT) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+
+        # Restarting both should result in them picking up where they left off,
+        # only processing new data.
+        self.source.restart()
+        self.sink.restart()
+
+        self.source.node.account.ssh("echo -e -n " + repr(self.SECOND_INPUT) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT + self.SECOND_INPUT), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
+
+    def validate_output(self, value):
+        try:
+            output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
+            return output_hash == hashlib.md5(value).hexdigest()
+        except subprocess.CalledProcessError:
+            return False

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index 755fb42..a83769b 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -160,6 +160,3 @@ class ReplicationTest(Test):
 
     def test_hard_bounce(self):
         self.run_with_failure(self.hard_bounce)
-
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/tests/kafkatest/tests/templates/copycat-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-file-sink.properties b/tests/kafkatest/tests/templates/copycat-file-sink.properties
new file mode 100644
index 0000000..c7865a6
--- /dev/null
+++ b/tests/kafkatest/tests/templates/copycat-file-sink.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-sink
+connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
+tasks.max=1
+file={{ OUTPUT_FILE }}
+topics=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/tests/kafkatest/tests/templates/copycat-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-file-source.properties b/tests/kafkatest/tests/templates/copycat-file-source.properties
new file mode 100644
index 0000000..8612ed7
--- /dev/null
+++ b/tests/kafkatest/tests/templates/copycat-file-source.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-source
+connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
+tasks.max=1
+file={{ INPUT_FILE }}
+topic=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/tests/kafkatest/tests/templates/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-standalone.properties b/tests/kafkatest/tests/templates/copycat-standalone.properties
new file mode 100644
index 0000000..5ffb487
--- /dev/null
+++ b/tests/kafkatest/tests/templates/copycat-standalone.properties
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bootstrap.servers={{ kafka.bootstrap_servers() }}
+
+key.converter=org.apache.kafka.copycat.json.JsonConverter
+value.converter=org.apache.kafka.copycat.json.JsonConverter
+key.serializer=org.apache.kafka.copycat.json.JsonSerializer
+value.serializer=org.apache.kafka.copycat.json.JsonSerializer
+key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
+value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
+
+offset.storage.file.filename={{ OFFSETS_FILE }}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fa8c748/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
index a2fa71a..2a53880 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -23,5 +23,5 @@ setup(name="kafkatest",
       platforms=["any"], 
       license="apache2.0",
       packages=find_packages(),
-      requires=["ducktape(==0.3.0)"]
+      requires=["ducktape(==0.3.1)"]
       )