You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/01 17:26:52 UTC

kafka git commit: KAFKA-5281; System tests for transactions

Repository: kafka
Updated Branches:
  refs/heads/trunk 8e8b3c565 -> 1959835d9


KAFKA-5281; System tests for transactions

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3149 from apurvam/KAFKA-5281-transactions-system-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1959835d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1959835d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1959835d

Branch: refs/heads/trunk
Commit: 1959835d9e148f0eb6407b36ff96b334d5e785cb
Parents: 8e8b3c5
Author: Apurva Mehta <ap...@confluent.io>
Authored: Thu Jun 1 10:25:29 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Jun 1 10:25:29 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/TransactionManager.java  |   4 +-
 .../common/requests/OffsetCommitResponse.java   |   1 +
 .../requests/TxnOffsetCommitResponse.java       |   1 +
 ...nsactionMarkerRequestCompletionHandler.scala |   3 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |  12 +-
 tests/kafkatest/services/console_consumer.py    |   9 +-
 .../services/transactional_message_copier.py    | 183 ++++++++++++
 tests/kafkatest/tests/core/transactions_test.py | 207 +++++++++++++
 tests/kafkatest/version.py                      |   5 +
 .../kafka/tools/TransactionalMessageCopier.java | 287 +++++++++++++++++++
 10 files changed, 706 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 221816c..11068a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -928,7 +928,9 @@ public class TransactionManager {
                 Errors error = entry.getValue();
                 if (error == Errors.NONE) {
                     pendingTxnOffsetCommits.remove(topicPartition);
-                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
+                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                        || error == Errors.NOT_COORDINATOR
+                        || error == Errors.REQUEST_TIMED_OUT) {
                     hadFailure = true;
                     if (!coordinatorReloaded) {
                         coordinatorReloaded = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 06e5608..782ffa5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -44,6 +44,7 @@ public class OffsetCommitResponse extends AbstractResponse {
      * Possible error codes:
      *
      * UNKNOWN_TOPIC_OR_PARTITION (3)
+     * REQUEST_TIMED_OUT (7)
      * OFFSET_METADATA_TOO_LARGE (12)
      * COORDINATOR_LOAD_IN_PROGRESS (14)
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index e7b349c..4c0f010 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -42,6 +42,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
     //   GroupAuthorizationFailed
     //   InvalidCommitOffsetSize
     //   TransactionalIdAuthorizationFailed
+    //   RequestTimedOut
 
     private final Map<TopicPartition, Errors> errors;
     private final int throttleTimeMs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 5fa6035..da40001 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -139,7 +139,8 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
                     case Errors.UNKNOWN_TOPIC_OR_PARTITION |
                          Errors.NOT_LEADER_FOR_PARTITION |
                          Errors.NOT_ENOUGH_REPLICAS |
-                         Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => // these are retriable errors
+                         Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND |
+                         Errors.REQUEST_TIMED_OUT => // these are retriable errors
 
                       info(s"Sending $transactionalId's transaction marker for partition $topicPartition has failed with error ${error.exceptionName}, retrying " +
                         s"with current coordinator epoch ${epochAndMetadata.coordinatorEpoch}")

http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 193a344..335c724 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -200,7 +200,7 @@ object ConsoleConsumer extends Logging {
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
-
+    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
     props
   }
 
@@ -264,7 +264,7 @@ object ConsoleConsumer extends Logging {
       "skip it instead of halt.")
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
-      "set, the csv metrics will be outputed here")
+      "set, the csv metrics will be output here")
       .withRequiredArg
       .describedAs("metrics directory")
       .ofType(classOf[java.lang.String])
@@ -284,6 +284,13 @@ object ConsoleConsumer extends Logging {
     val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
                                                        "Log lifecycle events of the consumer in addition to logging consumed " +
                                                        "messages. (This is specific for system tests.)")
+    val isolationLevelOpt = parser.accepts("isolation-level",
+        "Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted" +
+        "to read all messages.")
+      .withRequiredArg()
+      .ofType(classOf[String])
+      .defaultsTo("read_uncommitted")
+
 
     if (args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
@@ -314,6 +321,7 @@ object ConsoleConsumer extends Logging {
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
+    val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
 
     if (keyDeserializer != null && !keyDeserializer.isEmpty) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index d55d012..6fad674 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -21,7 +21,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0
 
 """
 0.8.2.1 ConsoleConsumer options
@@ -97,7 +97,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
     def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True,
                  message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH,
                  client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
-                 enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False):
+                 enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False,
+                 isolation_level="read_uncommitted"):
         """
         Args:
             context:                    standard context
@@ -117,6 +118,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             stop_timeout_sec            After stopping a node, wait up to stop_timeout_sec for the node to stop,
                                         and the corresponding background thread to finish successfully.
             print_timestamp             if True, print each message's timestamp as well
+            isolation_level             How to handle transactional messages.
         """
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [])
         BackgroundThreadService.__init__(self, context, num_nodes)
@@ -140,6 +142,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.log_level = "TRACE"
         self.stop_timeout_sec = stop_timeout_sec
 
+        self.isolation_level = isolation_level
         self.enable_systest_events = enable_systest_events
         if self.enable_systest_events:
             # Only available in 0.10.0 and up
@@ -190,6 +193,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             if node.version <= LATEST_0_10_0:
                 cmd += " --new-consumer"
             cmd += " --bootstrap-server %(broker_list)s" % args
+            if node.version >= V_0_11_0_0:
+                cmd += " --isolation-level %s" % self.isolation_level
         else:
             cmd += " --zookeeper %(zk_connect)s" % args
         if self.from_beginning:

http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tests/kafkatest/services/transactional_message_copier.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py
new file mode 100644
index 0000000..153e02c
--- /dev/null
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -0,0 +1,183 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import json
+import signal
+
+from ducktape.utils.util import wait_until
+from ducktape.services.background_thread import BackgroundThreadService
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from ducktape.cluster.remoteaccount import RemoteCommandError
+
+class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService):
+    """This service wraps org.apache.kafka.tools.TransactionalMessageCopier for
+    use in system testing.
+    """
+    PERSISTENT_ROOT = "/mnt/transactional_message_copier"
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "transactional_message_copier.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "transactional_message_copier.stderr")
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "transactional_message_copier.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+
+    logs = {
+        "transactional_message_copier_stdout": {
+            "path": STDOUT_CAPTURE,
+            "collect_default": True},
+        "transactional_message_copier_stderr": {
+            "path": STDERR_CAPTURE,
+            "collect_default": True},
+        "transactional_message_copier_log": {
+            "path": LOG_FILE,
+            "collect_default": True}
+    }
+
+    def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group,
+                 input_topic, input_partition, output_topic, max_messages = -1,
+                 transaction_size = 1000, log_level="INFO"):
+        super(TransactionalMessageCopier, self).__init__(context, num_nodes)
+        self.log_level = log_level
+        self.kafka = kafka
+        self.transactional_id = transactional_id
+        self.consumer_group = consumer_group
+        self.transaction_size = transaction_size
+        self.input_topic = input_topic
+        self.input_partition = input_partition
+        self.output_topic = output_topic
+        self.max_messages = max_messages
+        self.message_copy_finished = False
+        self.consumed = -1
+        self.remaining = -1
+        self.stop_timeout_sec = 60
+
+    def _worker(self, idx, node):
+        node.account.ssh("mkdir -p %s" % TransactionalMessageCopier.PERSISTENT_ROOT,
+                         allow_fail=False)
+        # Create and upload log properties
+        log_config = self.render('tools_log4j.properties',
+                                 log_file=TransactionalMessageCopier.LOG_FILE)
+        node.account.create_file(TransactionalMessageCopier.LOG4J_CONFIG, log_config)
+        # Configure security
+        self.security_config = self.kafka.security_config.client_config(node=node)
+        self.security_config.setup_node(node)
+        cmd = self.start_cmd(node, idx)
+        self.logger.debug("TransactionalMessageCopier %d command: %s" % (idx, cmd))
+        try:
+            for line in node.account.ssh_capture(cmd):
+                line = line.strip()
+                data = self.try_parse_json(line)
+                if data is not None:
+                    with self.lock:
+                        self.remaining = int(data["remaining"])
+                        self.consumed = int(data["consumed"])
+                        self.logger.info("%s: consumed %d, remaining %d" %
+                                         (self.transactional_id, self.consumed, self.remaining))
+                        if "shutdown_complete" in data:
+                           if self.remaining == 0:
+                                # We are only finished if the remaining
+                                # messages at the time of shutdown is 0.
+                                #
+                                # Otherwise a clean shutdown would still print
+                                # a 'shutdown complete' messages even though
+                                # there are unprocessed messages, causing
+                                # tests to fail.
+                                self.logger.info("%s : Finished message copy" % self.transactional_id)
+                                self.message_copy_finished = True
+                           else:
+                               self.logger.info("%s : Shut down without finishing message copy." %\
+                                                self.transactional_id)
+        except RemoteCommandError as e:
+            self.logger.debug("Got exception while reading output from copier, \
+                              probably because it was SIGKILL'd (exit code 137): %s" % str(e))
+
+    def start_cmd(self, node, idx):
+        cmd  = "export LOG_DIR=%s;" % TransactionalMessageCopier.LOG_DIR
+        cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % TransactionalMessageCopier.LOG4J_CONFIG
+        cmd += self.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools." + "TransactionalMessageCopier"
+        cmd += " --broker-list %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol)
+        cmd += " --transactional-id %s" % self.transactional_id
+        cmd += " --consumer-group %s" % self.consumer_group
+        cmd += " --input-topic %s" % self.input_topic
+        cmd += " --output-topic %s" % self.output_topic
+        cmd += " --input-partition %s" % str(self.input_partition)
+        cmd += " --transaction-size %s" % str(self.transaction_size)
+        if self.max_messages > 0:
+            cmd += " --max-messages %s" % str(self.max_messages)
+        cmd += " 2>> %s | tee -a %s &" % (TransactionalMessageCopier.STDERR_CAPTURE, TransactionalMessageCopier.STDOUT_CAPTURE)
+
+        return cmd
+
+    def clean_node(self, node, clean_shutdown=True):
+        self.kill_node(node, clean_shutdown)
+        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+        self.security_config.clean_node(node)
+
+    def pids(self, node):
+        try:
+            cmd = "ps ax | grep -i TransactionalMessageCopier | grep java | grep -v grep | awk '{print $1}'"
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            return pid_arr
+        except (RemoteCommandError, ValueError) as e:
+            self.logger.error("Could not list pids: %s" % str(e))
+            return []
+
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
+    def kill_node(self, node, clean_shutdown=True):
+        pids = self.pids(node)
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+        for pid in pids:
+            node.account.signal(pid, sig)
+            wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Message Copier failed to stop")
+
+    def stop_node(self, node, clean_shutdown=True):
+        self.kill_node(node, clean_shutdown)
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+            (str(node.account), str(self.stop_timeout_sec))
+
+    def restart(self, clean_shutdown):
+        if self.is_done:
+            return
+        node = self.nodes[0]
+        with self.lock:
+            self.consumed = -1
+            self.remaining = -1
+        self.stop_node(node, clean_shutdown)
+        self.start_node(node)
+
+    def try_parse_json(self, string):
+        """Try to parse a string as json. Return None if not parseable."""
+        try:
+            record = json.loads(string)
+            return record
+        except ValueError:
+            self.logger.debug("Could not parse as json: %s" % str(string))
+            return None
+
+    @property
+    def is_done(self):
+        return self.message_copy_finished
+
+    def progress_percent(self):
+        with self.lock:
+            if self.remaining < 0:
+                return 0
+            if self.consumed + self.remaining == 0:
+                return 100
+            return (float(self.consumed)/float(self.consumed + self.remaining)) * 100

http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tests/kafkatest/tests/core/transactions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py
new file mode 100644
index 0000000..a98a1c9
--- /dev/null
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -0,0 +1,207 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.transactional_message_copier import TransactionalMessageCopier
+from kafkatest.utils import is_int
+
+from ducktape.tests.test import Test
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+
+class TransactionsTest(Test):
+    """Tests transactions by transactionally copying data from a source topic to
+    a destination topic and killing the copy process as well as the broker
+    randomly through the process. In the end we verify that the final output
+    topic contains exactly one committed copy of each message in the input
+    topic
+    """
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(TransactionsTest, self).__init__(test_context=test_context)
+
+        self.input_topic = "input-topic"
+        self.output_topic = "output-topic"
+
+        self.num_brokers = 3
+
+        # Test parameters
+        self.num_input_partitions = 2
+        self.num_output_partitions = 3
+        self.num_seed_messages = 20000
+        self.transaction_size = 500
+        self.first_transactional_id = "my-first-transactional-id"
+        self.second_transactional_id = "my-second-transactional-id"
+        self.consumer_group = "transactions-test-consumer-group"
+
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context,
+                                  num_nodes=self.num_brokers,
+                                  zk=self.zk,
+                                  topics = {
+                                      self.input_topic: {
+                                          "partitions": self.num_input_partitions,
+                                          "replication-factor": 3,
+                                          "configs": {
+                                              "min.insync.replicas": 2
+                                          }
+                                      },
+                                      self.output_topic: {
+                                          "partitions": self.num_output_partitions,
+                                          "replication-factor": 3,
+                                          "configs": {
+                                              "min.insync.replicas": 2
+                                          }
+                                      }
+                                  })
+
+    def setUp(self):
+        self.zk.start()
+
+    def seed_messages(self):
+        seed_timeout_sec = 10000
+        seed_producer = VerifiableProducer(context=self.test_context,
+                                           num_nodes=1,
+                                           kafka=self.kafka,
+                                           topic=self.input_topic,
+                                           message_validator=is_int,
+                                           max_messages=self.num_seed_messages,
+                                           enable_idempotence=True)
+
+        seed_producer.start()
+        wait_until(lambda: seed_producer.num_acked >= self.num_seed_messages,
+                   timeout_sec=seed_timeout_sec,
+                   err_msg="Producer failed to produce messages %d in  %ds." %\
+                   (self.num_seed_messages, seed_timeout_sec))
+        return seed_producer.acked
+
+    def get_messages_from_output_topic(self):
+        consumer = ConsoleConsumer(context=self.test_context,
+                                   num_nodes=1,
+                                   kafka=self.kafka,
+                                   topic=self.output_topic,
+                                   new_consumer=True,
+                                   message_validator=is_int,
+                                   from_beginning=True,
+                                   consumer_timeout_ms=5000,
+                                   isolation_level="read_committed")
+        consumer.start()
+        # ensure that the consumer is up.
+        wait_until(lambda: consumer.alive(consumer.nodes[0]) == True,
+                   timeout_sec=60,
+                   err_msg="Consumer failed to start for %ds" %\
+                   60)
+        # wait until the consumer closes, which will be 5 seconds after
+        # receiving the last message.
+        wait_until(lambda: consumer.alive(consumer.nodes[0]) == False,
+                   timeout_sec=60,
+                   err_msg="Consumer failed to consume %d messages in %ds" %\
+                   (self.num_seed_messages, 60))
+        return consumer.messages_consumed[1]
+
+    def bounce_brokers(self, clean_shutdown):
+       for node in self.kafka.nodes:
+            if clean_shutdown:
+                self.kafka.restart_node(node, clean_shutdown = True)
+            else:
+                self.kafka.stop_node(node, clean_shutdown = False)
+                wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node),
+                           timeout_sec=self.kafka.zk_session_timeout + 5,
+                           err_msg="Failed to see timely deregistration of \
+                           hard-killed broker %s" % str(node.account))
+                self.kafka.start_node(node)
+
+    def create_and_start_message_copier(self, input_partition, transactional_id):
+        message_copier = TransactionalMessageCopier(
+            context=self.test_context,
+            num_nodes=1,
+            kafka=self.kafka,
+            transactional_id=transactional_id,
+            consumer_group=self.consumer_group,
+            input_topic=self.input_topic,
+            input_partition=input_partition,
+            output_topic=self.output_topic,
+            max_messages=-1,
+            transaction_size=self.transaction_size
+        )
+        message_copier.start()
+        wait_until(lambda: message_copier.alive(message_copier.nodes[0]),
+                   timeout_sec=10,
+                   err_msg="Message copier failed to start after 10 s")
+        return message_copier
+
+    def bounce_copiers(self, copiers, clean_shutdown):
+        for _ in range(3):
+            for copier in copiers:
+                wait_until(lambda: copier.progress_percent() >= 20.0,
+                           timeout_sec=30,
+                           err_msg="%s : Message copier didn't make enough progress in 30s. Current progress: %s" \
+                           % (copier.transactional_id, str(copier.progress_percent())))
+                self.logger.info("%s - progress: %s" % (copier.transactional_id,
+                                                        str(copier.progress_percent())))
+                copier.restart(clean_shutdown)
+
+    def create_and_start_copiers(self):
+        copiers = []
+        copiers.append(self.create_and_start_message_copier(
+            input_partition=0,
+            transactional_id=self.first_transactional_id
+        ))
+        copiers.append(self.create_and_start_message_copier(
+            input_partition=1,
+            transactional_id=self.second_transactional_id
+        ))
+        return copiers
+
+    def copy_messages_transactionally(self, failure_mode, bounce_target):
+        copiers = self.create_and_start_copiers()
+        clean_shutdown = False
+        if failure_mode == "clean_bounce":
+            clean_shutdown = True
+
+        if bounce_target == "brokers":
+            self.bounce_brokers(clean_shutdown)
+        elif bounce_target == "clients":
+            self.bounce_copiers(copiers, clean_shutdown)
+
+        for copier in copiers:
+            wait_until(lambda: copier.is_done,
+                       timeout_sec=60,
+                       err_msg="%s - Failed to copy all messages in  %ds." %\
+                       (copier.transactional_id, 60))
+        self.logger.info("finished copying messages")
+
+    @cluster(num_nodes=8)
+    @matrix(failure_mode=["clean_bounce", "hard_bounce"],
+            bounce_target=["brokers", "clients"])
+    def test_transactions(self, failure_mode, bounce_target):
+        security_protocol = 'PLAINTEXT'
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.start()
+        input_messages = self.seed_messages()
+        self.copy_messages_transactionally(failure_mode, bounce_target)
+        output_messages = self.get_messages_from_output_topic()
+        output_message_set = set(output_messages)
+        input_message_set = set(input_messages)
+        num_dups = abs(len(output_messages) - len(output_message_set))
+        assert num_dups == 0, "Detected %d duplicates in the output stream" % num_dups
+        assert input_message_set == output_message_set, "Input and output message sets are not equal. Num input messages %d. Num output messages %d" %\
+            (len(input_message_set), len(output_message_set))

http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tests/kafkatest/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 8e1497c..f63a7c1 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -88,3 +88,8 @@ V_0_10_2_1 = KafkaVersion("0.10.2.1")
 LATEST_0_10_2 = V_0_10_2_1
 
 LATEST_0_10 = LATEST_0_10_2
+
+# 0.11.0.0 versions
+V_0_11_0_0 = KafkaVersion("0.11.0.0")
+LATEST_0_11_0 = V_0_11_0_0
+LATEST_0_11 = LATEST_0_11_0

http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
new file mode 100644
index 0000000..c79c854
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class is primarily meant for use with system tests. It copies messages from an input partition to an output
+ * topic transactionally, committing the offsets and messages together.
+ */
+public class TransactionalMessageCopier {
+     /** Get the command-line argument parser. */
+    private static ArgumentParser argParser() {
+        ArgumentParser parser = ArgumentParsers
+                .newArgumentParser("transactional-message-copier")
+                .defaultHelp(true)
+                .description("This tool copies messages transactionally from an input partition to an output topic, committing the consumed offsets along with the output messages");
+
+        parser.addArgument("--input-topic")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("INPUT-TOPIC")
+                .dest("inputTopic")
+                .help("Consume messages from this topic");
+
+        parser.addArgument("--input-partition")
+                .action(store())
+                .required(true)
+                .type(Integer.class)
+                .metavar("INPUT-PARTITION")
+                .dest("inputPartition")
+                .help("Consume messages from this partition of the input topic.");
+
+
+        parser.addArgument("--output-topic")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("OUTPUT-TOPIC")
+                .dest("outputTopic")
+                .help("Produce messages to this topic");
+
+        parser.addArgument("--broker-list")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+                .dest("brokerList")
+                .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+
+        parser.addArgument("--max-messages")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .metavar("MAX-MESSAGES")
+                .dest("maxMessages")
+                .help("Process these many messages upto the end offset at the time this program was launched. If set to -1 " +
+                        "we will just read to the end offset of the input partition (as of the time the program was launched).");
+
+        parser.addArgument("--consumer-group")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(String.class)
+                .metavar("CONSUMER-GROUP")
+                .dest("consumerGroup")
+                .help("The consumer group id to use for storing the consumer offsets.");
+
+        parser.addArgument("--transaction-size")
+                .action(store())
+                .required(false)
+                .setDefault(200)
+                .type(Integer.class)
+                .metavar("TRANSACTION-SIZE")
+                .dest("messagesPerTransaction")
+                .help("The number of messages to put in each transaction. Default is 200.");
+
+
+        parser.addArgument("--transactional-id")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("TRANSACTIONAL-ID")
+                .dest("transactionalId")
+                .help("The transactionalId to assign to the producer");
+
+
+        return parser;
+    }
+
+    private static KafkaProducer<String, String> createProducer(Namespace parsedArgs) {
+        String transactionalId = parsedArgs.getString("transactionalId");
+        String brokerList = parsedArgs.getString("brokerList");
+
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringSerializer");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringSerializer");
+
+        return new KafkaProducer<>(props);
+    }
+
+    private static KafkaConsumer<String, String> createConsumer(Namespace parsedArgs, TopicPartition inputPartition) {
+        String consumerGroup = parsedArgs.getString("consumerGroup");
+        String brokerList = parsedArgs.getString("brokerList");
+        Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction");
+
+        Properties props = new Properties();
+
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, numMessagesPerTransaction.toString());
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
+        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+
+        return consumer;
+
+    }
+
+    private static ProducerRecord<String, String> producerRecordFromConsumerRecord(String topic, ConsumerRecord<String, String> record) {
+        return new ProducerRecord<>(topic, record.key(), record.value());
+    }
+
+    private static Map<TopicPartition, OffsetAndMetadata> consumerPositions(KafkaConsumer<String, String> consumer) {
+        Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>();
+        for (TopicPartition topicPartition : consumer.assignment()) {
+            positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
+        }
+        return positions;
+    }
+
+    private static long messagesRemaining(KafkaConsumer<String, String> consumer, TopicPartition partition) {
+        long currentPosition = consumer.position(partition);
+        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Arrays.asList(partition));
+        if (endOffsets.containsKey(partition)) {
+            return endOffsets.get(partition) - currentPosition;
+        }
+        return 0;
+    }
+
+    private static String toJsonString(Map<String, Object> data) {
+        String json;
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+            json = mapper.writeValueAsString(data);
+        } catch (JsonProcessingException e) {
+            json = "Bad data can't be written as json: " + e.getMessage();
+        }
+        return json;
+    }
+
+    private static String statusAsJson(long consumed, long remaining, String transactionalId) {
+        Map<String, Object> statusData = new HashMap<>();
+        statusData.put("progress", transactionalId);
+        statusData.put("consumed", consumed);
+        statusData.put("remaining", remaining);
+        return toJsonString(statusData);
+    }
+
+    private static String shutDownString(long consumed, long remaining, String transactionalId) {
+        Map<String, Object> shutdownData = new HashMap<>();
+        shutdownData.put("remaining", remaining);
+        shutdownData.put("consumed", consumed);
+        shutdownData.put("shutdown_complete", transactionalId);
+        return toJsonString(shutdownData);
+    }
+
+    public static void main(String[] args) throws IOException {
+        Namespace parsedArgs = argParser().parseArgsOrFail(args);
+        Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction");
+        final String transactionalId = parsedArgs.getString("transactionalId");
+        final String outputTopic = parsedArgs.getString("outputTopic");
+
+        String consumerGroup = parsedArgs.getString("consumerGroup");
+        TopicPartition inputPartition = new TopicPartition(parsedArgs.getString("inputTopic"), parsedArgs.getInt("inputPartition"));
+
+        final KafkaProducer<String, String> producer = createProducer(parsedArgs);
+        final KafkaConsumer<String, String> consumer = createConsumer(parsedArgs, inputPartition);
+
+        consumer.assign(Arrays.asList(inputPartition));
+
+        long maxMessages = parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : parsedArgs.getInt("maxMessages");
+        maxMessages = Math.min(messagesRemaining(consumer, inputPartition), maxMessages);
+
+        producer.initTransactions();
+
+
+        final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+        final AtomicLong remainingMessages = new AtomicLong(maxMessages);
+        final AtomicLong numMessagesProcessed = new AtomicLong(0);
+        int exitCode = 0;
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                isShuttingDown.set(true);
+                // Flush any remaining messages
+                producer.close();
+                synchronized (consumer) {
+                    consumer.close();
+                }
+                System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
+            }
+        });
+
+        try {
+            while (0 < remainingMessages.get()) {
+                if ((((double) numMessagesProcessed.get() / maxMessages) * 100) % 10 == 0) {
+                    // print status for every 10% we progress.
+                    System.out.println(statusAsJson(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
+                }
+                if (isShuttingDown.get())
+                    break;
+                int messagesInCurrentTransaction = 0;
+                long numMessagesForNextTransaction = Math.min(numMessagesPerTransaction, remainingMessages.get());
+                producer.beginTransaction();
+
+                while (messagesInCurrentTransaction < numMessagesForNextTransaction) {
+                    ConsumerRecords<String, String> records = consumer.poll(200L);
+                    for (ConsumerRecord<String, String> record : records) {
+                        producer.send(producerRecordFromConsumerRecord(outputTopic, record));
+                        messagesInCurrentTransaction++;
+                    }
+                }
+                producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup);
+                producer.commitTransaction();
+                remainingMessages.set(maxMessages - numMessagesProcessed.addAndGet(messagesInCurrentTransaction));
+            }
+        } finally {
+            producer.close();
+            synchronized (consumer) {
+                consumer.close();
+            }
+        }
+        System.exit(exitCode);
+    }
+}