You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/01 17:26:52 UTC
kafka git commit: KAFKA-5281; System tests for transactions
Repository: kafka
Updated Branches:
refs/heads/trunk 8e8b3c565 -> 1959835d9
KAFKA-5281; System tests for transactions
Author: Apurva Mehta <ap...@confluent.io>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #3149 from apurvam/KAFKA-5281-transactions-system-tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1959835d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1959835d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1959835d
Branch: refs/heads/trunk
Commit: 1959835d9e148f0eb6407b36ff96b334d5e785cb
Parents: 8e8b3c5
Author: Apurva Mehta <ap...@confluent.io>
Authored: Thu Jun 1 10:25:29 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Jun 1 10:25:29 2017 -0700
----------------------------------------------------------------------
.../producer/internals/TransactionManager.java | 4 +-
.../common/requests/OffsetCommitResponse.java | 1 +
.../requests/TxnOffsetCommitResponse.java | 1 +
...nsactionMarkerRequestCompletionHandler.scala | 3 +-
.../scala/kafka/tools/ConsoleConsumer.scala | 12 +-
tests/kafkatest/services/console_consumer.py | 9 +-
.../services/transactional_message_copier.py | 183 ++++++++++++
tests/kafkatest/tests/core/transactions_test.py | 207 +++++++++++++
tests/kafkatest/version.py | 5 +
.../kafka/tools/TransactionalMessageCopier.java | 287 +++++++++++++++++++
10 files changed, 706 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 221816c..11068a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -928,7 +928,9 @@ public class TransactionManager {
Errors error = entry.getValue();
if (error == Errors.NONE) {
pendingTxnOffsetCommits.remove(topicPartition);
- } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
+ } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+ || error == Errors.NOT_COORDINATOR
+ || error == Errors.REQUEST_TIMED_OUT) {
hadFailure = true;
if (!coordinatorReloaded) {
coordinatorReloaded = true;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 06e5608..782ffa5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -44,6 +44,7 @@ public class OffsetCommitResponse extends AbstractResponse {
* Possible error codes:
*
* UNKNOWN_TOPIC_OR_PARTITION (3)
+ * REQUEST_TIMED_OUT (7)
* OFFSET_METADATA_TOO_LARGE (12)
* COORDINATOR_LOAD_IN_PROGRESS (14)
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index e7b349c..4c0f010 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -42,6 +42,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
// GroupAuthorizationFailed
// InvalidCommitOffsetSize
// TransactionalIdAuthorizationFailed
+ // RequestTimedOut
private final Map<TopicPartition, Errors> errors;
private final int throttleTimeMs;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 5fa6035..da40001 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -139,7 +139,8 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
case Errors.UNKNOWN_TOPIC_OR_PARTITION |
Errors.NOT_LEADER_FOR_PARTITION |
Errors.NOT_ENOUGH_REPLICAS |
- Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => // these are retriable errors
+ Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND |
+ Errors.REQUEST_TIMED_OUT => // these are retriable errors
info(s"Sending $transactionalId's transaction marker for partition $topicPartition has failed with error ${error.exceptionName}, retrying " +
s"with current coordinator epoch ${epochAndMetadata.coordinatorEpoch}")
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 193a344..335c724 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -200,7 +200,7 @@ object ConsoleConsumer extends Logging {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
-
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
props
}
@@ -264,7 +264,7 @@ object ConsoleConsumer extends Logging {
"skip it instead of halt.")
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
- "set, the csv metrics will be outputed here")
+ "set, the csv metrics will be output here")
.withRequiredArg
.describedAs("metrics directory")
.ofType(classOf[java.lang.String])
@@ -284,6 +284,13 @@ object ConsoleConsumer extends Logging {
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
"Log lifecycle events of the consumer in addition to logging consumed " +
"messages. (This is specific for system tests.)")
+ val isolationLevelOpt = parser.accepts("isolation-level",
+ "Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted" +
+ "to read all messages.")
+ .withRequiredArg()
+ .ofType(classOf[String])
+ .defaultsTo("read_uncommitted")
+
if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
@@ -314,6 +321,7 @@ object ConsoleConsumer extends Logging {
val bootstrapServer = options.valueOf(bootstrapServerOpt)
val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt)
+ val isolationLevel = options.valueOf(isolationLevelOpt).toString
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
if (keyDeserializer != null && !keyDeserializer.isEmpty) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index d55d012..6fad674 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -21,7 +21,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0
"""
0.8.2.1 ConsoleConsumer options
@@ -97,7 +97,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True,
message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH,
client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
- enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False):
+ enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False,
+ isolation_level="read_uncommitted"):
"""
Args:
context: standard context
@@ -117,6 +118,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
stop_timeout_sec After stopping a node, wait up to stop_timeout_sec for the node to stop,
and the corresponding background thread to finish successfully.
print_timestamp if True, print each message's timestamp as well
+ isolation_level How to handle transactional messages.
"""
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [])
BackgroundThreadService.__init__(self, context, num_nodes)
@@ -140,6 +142,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
self.log_level = "TRACE"
self.stop_timeout_sec = stop_timeout_sec
+ self.isolation_level = isolation_level
self.enable_systest_events = enable_systest_events
if self.enable_systest_events:
# Only available in 0.10.0 and up
@@ -190,6 +193,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
if node.version <= LATEST_0_10_0:
cmd += " --new-consumer"
cmd += " --bootstrap-server %(broker_list)s" % args
+ if node.version >= V_0_11_0_0:
+ cmd += " --isolation-level %s" % self.isolation_level
else:
cmd += " --zookeeper %(zk_connect)s" % args
if self.from_beginning:
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tests/kafkatest/services/transactional_message_copier.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py
new file mode 100644
index 0000000..153e02c
--- /dev/null
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -0,0 +1,183 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import json
+import signal
+
+from ducktape.utils.util import wait_until
+from ducktape.services.background_thread import BackgroundThreadService
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from ducktape.cluster.remoteaccount import RemoteCommandError
+
+class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService):
+ """This service wraps org.apache.kafka.tools.TransactionalMessageCopier for
+ use in system testing.
+ """
+ PERSISTENT_ROOT = "/mnt/transactional_message_copier"
+ STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "transactional_message_copier.stdout")
+ STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "transactional_message_copier.stderr")
+ LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+ LOG_FILE = os.path.join(LOG_DIR, "transactional_message_copier.log")
+ LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+
+ logs = {
+ "transactional_message_copier_stdout": {
+ "path": STDOUT_CAPTURE,
+ "collect_default": True},
+ "transactional_message_copier_stderr": {
+ "path": STDERR_CAPTURE,
+ "collect_default": True},
+ "transactional_message_copier_log": {
+ "path": LOG_FILE,
+ "collect_default": True}
+ }
+
+ def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group,
+ input_topic, input_partition, output_topic, max_messages = -1,
+ transaction_size = 1000, log_level="INFO"):
+ super(TransactionalMessageCopier, self).__init__(context, num_nodes)
+ self.log_level = log_level
+ self.kafka = kafka
+ self.transactional_id = transactional_id
+ self.consumer_group = consumer_group
+ self.transaction_size = transaction_size
+ self.input_topic = input_topic
+ self.input_partition = input_partition
+ self.output_topic = output_topic
+ self.max_messages = max_messages
+ self.message_copy_finished = False
+ self.consumed = -1
+ self.remaining = -1
+ self.stop_timeout_sec = 60
+
+ def _worker(self, idx, node):
+ node.account.ssh("mkdir -p %s" % TransactionalMessageCopier.PERSISTENT_ROOT,
+ allow_fail=False)
+ # Create and upload log properties
+ log_config = self.render('tools_log4j.properties',
+ log_file=TransactionalMessageCopier.LOG_FILE)
+ node.account.create_file(TransactionalMessageCopier.LOG4J_CONFIG, log_config)
+ # Configure security
+ self.security_config = self.kafka.security_config.client_config(node=node)
+ self.security_config.setup_node(node)
+ cmd = self.start_cmd(node, idx)
+ self.logger.debug("TransactionalMessageCopier %d command: %s" % (idx, cmd))
+ try:
+ for line in node.account.ssh_capture(cmd):
+ line = line.strip()
+ data = self.try_parse_json(line)
+ if data is not None:
+ with self.lock:
+ self.remaining = int(data["remaining"])
+ self.consumed = int(data["consumed"])
+ self.logger.info("%s: consumed %d, remaining %d" %
+ (self.transactional_id, self.consumed, self.remaining))
+ if "shutdown_complete" in data:
+ if self.remaining == 0:
+ # We are only finished if the remaining
+ # messages at the time of shutdown is 0.
+ #
+ # Otherwise a clean shutdown would still print
+ # a 'shutdown complete' messages even though
+ # there are unprocessed messages, causing
+ # tests to fail.
+ self.logger.info("%s : Finished message copy" % self.transactional_id)
+ self.message_copy_finished = True
+ else:
+ self.logger.info("%s : Shut down without finishing message copy." %\
+ self.transactional_id)
+ except RemoteCommandError as e:
+ self.logger.debug("Got exception while reading output from copier, \
+ probably because it was SIGKILL'd (exit code 137): %s" % str(e))
+
+ def start_cmd(self, node, idx):
+ cmd = "export LOG_DIR=%s;" % TransactionalMessageCopier.LOG_DIR
+ cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+ cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % TransactionalMessageCopier.LOG4J_CONFIG
+ cmd += self.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools." + "TransactionalMessageCopier"
+ cmd += " --broker-list %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol)
+ cmd += " --transactional-id %s" % self.transactional_id
+ cmd += " --consumer-group %s" % self.consumer_group
+ cmd += " --input-topic %s" % self.input_topic
+ cmd += " --output-topic %s" % self.output_topic
+ cmd += " --input-partition %s" % str(self.input_partition)
+ cmd += " --transaction-size %s" % str(self.transaction_size)
+ if self.max_messages > 0:
+ cmd += " --max-messages %s" % str(self.max_messages)
+ cmd += " 2>> %s | tee -a %s &" % (TransactionalMessageCopier.STDERR_CAPTURE, TransactionalMessageCopier.STDOUT_CAPTURE)
+
+ return cmd
+
+ def clean_node(self, node, clean_shutdown=True):
+ self.kill_node(node, clean_shutdown)
+ node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+ self.security_config.clean_node(node)
+
+ def pids(self, node):
+ try:
+ cmd = "ps ax | grep -i TransactionalMessageCopier | grep java | grep -v grep | awk '{print $1}'"
+ pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+ return pid_arr
+ except (RemoteCommandError, ValueError) as e:
+ self.logger.error("Could not list pids: %s" % str(e))
+ return []
+
+ def alive(self, node):
+ return len(self.pids(node)) > 0
+
+ def kill_node(self, node, clean_shutdown=True):
+ pids = self.pids(node)
+ sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+ for pid in pids:
+ node.account.signal(pid, sig)
+ wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Message Copier failed to stop")
+
+ def stop_node(self, node, clean_shutdown=True):
+ self.kill_node(node, clean_shutdown)
+ stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+ assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+ (str(node.account), str(self.stop_timeout_sec))
+
+ def restart(self, clean_shutdown):
+ if self.is_done:
+ return
+ node = self.nodes[0]
+ with self.lock:
+ self.consumed = -1
+ self.remaining = -1
+ self.stop_node(node, clean_shutdown)
+ self.start_node(node)
+
+ def try_parse_json(self, string):
+ """Try to parse a string as json. Return None if not parseable."""
+ try:
+ record = json.loads(string)
+ return record
+ except ValueError:
+ self.logger.debug("Could not parse as json: %s" % str(string))
+ return None
+
+ @property
+ def is_done(self):
+ return self.message_copy_finished
+
+ def progress_percent(self):
+ with self.lock:
+ if self.remaining < 0:
+ return 0
+ if self.consumed + self.remaining == 0:
+ return 100
+ return (float(self.consumed)/float(self.consumed + self.remaining)) * 100
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tests/kafkatest/tests/core/transactions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py
new file mode 100644
index 0000000..a98a1c9
--- /dev/null
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -0,0 +1,207 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.transactional_message_copier import TransactionalMessageCopier
+from kafkatest.utils import is_int
+
+from ducktape.tests.test import Test
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+
+class TransactionsTest(Test):
+ """Tests transactions by transactionally copying data from a source topic to
+ a destination topic and killing the copy process as well as the broker
+ randomly through the process. In the end we verify that the final output
+ topic contains exactly one committed copy of each message in the input
+ topic
+ """
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(TransactionsTest, self).__init__(test_context=test_context)
+
+ self.input_topic = "input-topic"
+ self.output_topic = "output-topic"
+
+ self.num_brokers = 3
+
+ # Test parameters
+ self.num_input_partitions = 2
+ self.num_output_partitions = 3
+ self.num_seed_messages = 20000
+ self.transaction_size = 500
+ self.first_transactional_id = "my-first-transactional-id"
+ self.second_transactional_id = "my-second-transactional-id"
+ self.consumer_group = "transactions-test-consumer-group"
+
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context,
+ num_nodes=self.num_brokers,
+ zk=self.zk,
+ topics = {
+ self.input_topic: {
+ "partitions": self.num_input_partitions,
+ "replication-factor": 3,
+ "configs": {
+ "min.insync.replicas": 2
+ }
+ },
+ self.output_topic: {
+ "partitions": self.num_output_partitions,
+ "replication-factor": 3,
+ "configs": {
+ "min.insync.replicas": 2
+ }
+ }
+ })
+
+ def setUp(self):
+ self.zk.start()
+
+ def seed_messages(self):
+ seed_timeout_sec = 10000
+ seed_producer = VerifiableProducer(context=self.test_context,
+ num_nodes=1,
+ kafka=self.kafka,
+ topic=self.input_topic,
+ message_validator=is_int,
+ max_messages=self.num_seed_messages,
+ enable_idempotence=True)
+
+ seed_producer.start()
+ wait_until(lambda: seed_producer.num_acked >= self.num_seed_messages,
+ timeout_sec=seed_timeout_sec,
+ err_msg="Producer failed to produce messages %d in %ds." %\
+ (self.num_seed_messages, seed_timeout_sec))
+ return seed_producer.acked
+
+ def get_messages_from_output_topic(self):
+ consumer = ConsoleConsumer(context=self.test_context,
+ num_nodes=1,
+ kafka=self.kafka,
+ topic=self.output_topic,
+ new_consumer=True,
+ message_validator=is_int,
+ from_beginning=True,
+ consumer_timeout_ms=5000,
+ isolation_level="read_committed")
+ consumer.start()
+ # ensure that the consumer is up.
+ wait_until(lambda: consumer.alive(consumer.nodes[0]) == True,
+ timeout_sec=60,
+ err_msg="Consumer failed to start for %ds" %\
+ 60)
+ # wait until the consumer closes, which will be 5 seconds after
+ # receiving the last message.
+ wait_until(lambda: consumer.alive(consumer.nodes[0]) == False,
+ timeout_sec=60,
+ err_msg="Consumer failed to consume %d messages in %ds" %\
+ (self.num_seed_messages, 60))
+ return consumer.messages_consumed[1]
+
+ def bounce_brokers(self, clean_shutdown):
+ for node in self.kafka.nodes:
+ if clean_shutdown:
+ self.kafka.restart_node(node, clean_shutdown = True)
+ else:
+ self.kafka.stop_node(node, clean_shutdown = False)
+ wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node),
+ timeout_sec=self.kafka.zk_session_timeout + 5,
+ err_msg="Failed to see timely deregistration of \
+ hard-killed broker %s" % str(node.account))
+ self.kafka.start_node(node)
+
+ def create_and_start_message_copier(self, input_partition, transactional_id):
+ message_copier = TransactionalMessageCopier(
+ context=self.test_context,
+ num_nodes=1,
+ kafka=self.kafka,
+ transactional_id=transactional_id,
+ consumer_group=self.consumer_group,
+ input_topic=self.input_topic,
+ input_partition=input_partition,
+ output_topic=self.output_topic,
+ max_messages=-1,
+ transaction_size=self.transaction_size
+ )
+ message_copier.start()
+ wait_until(lambda: message_copier.alive(message_copier.nodes[0]),
+ timeout_sec=10,
+ err_msg="Message copier failed to start after 10 s")
+ return message_copier
+
+ def bounce_copiers(self, copiers, clean_shutdown):
+ for _ in range(3):
+ for copier in copiers:
+ wait_until(lambda: copier.progress_percent() >= 20.0,
+ timeout_sec=30,
+ err_msg="%s : Message copier didn't make enough progress in 30s. Current progress: %s" \
+ % (copier.transactional_id, str(copier.progress_percent())))
+ self.logger.info("%s - progress: %s" % (copier.transactional_id,
+ str(copier.progress_percent())))
+ copier.restart(clean_shutdown)
+
+ def create_and_start_copiers(self):
+ copiers = []
+ copiers.append(self.create_and_start_message_copier(
+ input_partition=0,
+ transactional_id=self.first_transactional_id
+ ))
+ copiers.append(self.create_and_start_message_copier(
+ input_partition=1,
+ transactional_id=self.second_transactional_id
+ ))
+ return copiers
+
+ def copy_messages_transactionally(self, failure_mode, bounce_target):
+ copiers = self.create_and_start_copiers()
+ clean_shutdown = False
+ if failure_mode == "clean_bounce":
+ clean_shutdown = True
+
+ if bounce_target == "brokers":
+ self.bounce_brokers(clean_shutdown)
+ elif bounce_target == "clients":
+ self.bounce_copiers(copiers, clean_shutdown)
+
+ for copier in copiers:
+ wait_until(lambda: copier.is_done,
+ timeout_sec=60,
+ err_msg="%s - Failed to copy all messages in %ds." %\
+ (copier.transactional_id, 60))
+ self.logger.info("finished copying messages")
+
+ @cluster(num_nodes=8)
+ @matrix(failure_mode=["clean_bounce", "hard_bounce"],
+ bounce_target=["brokers", "clients"])
+ def test_transactions(self, failure_mode, bounce_target):
+ security_protocol = 'PLAINTEXT'
+ self.kafka.security_protocol = security_protocol
+ self.kafka.interbroker_security_protocol = security_protocol
+ self.kafka.start()
+ input_messages = self.seed_messages()
+ self.copy_messages_transactionally(failure_mode, bounce_target)
+ output_messages = self.get_messages_from_output_topic()
+ output_message_set = set(output_messages)
+ input_message_set = set(input_messages)
+ num_dups = abs(len(output_messages) - len(output_message_set))
+ assert num_dups == 0, "Detected %d duplicates in the output stream" % num_dups
+ assert input_message_set == output_message_set, "Input and output message sets are not equal. Num input messages %d. Num output messages %d" %\
+ (len(input_message_set), len(output_message_set))
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tests/kafkatest/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 8e1497c..f63a7c1 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -88,3 +88,8 @@ V_0_10_2_1 = KafkaVersion("0.10.2.1")
LATEST_0_10_2 = V_0_10_2_1
LATEST_0_10 = LATEST_0_10_2
+
+# 0.11.0.0 versions
+V_0_11_0_0 = KafkaVersion("0.11.0.0")
+LATEST_0_11_0 = V_0_11_0_0
+LATEST_0_11 = LATEST_0_11_0
http://git-wip-us.apache.org/repos/asf/kafka/blob/1959835d/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
new file mode 100644
index 0000000..c79c854
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class is primarily meant for use with system tests. It copies messages from an input partition to an output
+ * topic transactionally, committing the offsets and messages together.
+ */
+public class TransactionalMessageCopier {
+ /** Get the command-line argument parser. */
+ private static ArgumentParser argParser() {
+ ArgumentParser parser = ArgumentParsers
+ .newArgumentParser("transactional-message-copier")
+ .defaultHelp(true)
+ .description("This tool copies messages transactionally from an input partition to an output topic, committing the consumed offsets along with the output messages");
+
+ parser.addArgument("--input-topic")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .metavar("INPUT-TOPIC")
+ .dest("inputTopic")
+ .help("Consume messages from this topic");
+
+ parser.addArgument("--input-partition")
+ .action(store())
+ .required(true)
+ .type(Integer.class)
+ .metavar("INPUT-PARTITION")
+ .dest("inputPartition")
+ .help("Consume messages from this partition of the input topic.");
+
+
+ parser.addArgument("--output-topic")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .metavar("OUTPUT-TOPIC")
+ .dest("outputTopic")
+ .help("Produce messages to this topic");
+
+ parser.addArgument("--broker-list")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+ .dest("brokerList")
+ .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+
+ parser.addArgument("--max-messages")
+ .action(store())
+ .required(false)
+ .setDefault(-1)
+ .type(Integer.class)
+ .metavar("MAX-MESSAGES")
+ .dest("maxMessages")
+ .help("Process these many messages upto the end offset at the time this program was launched. If set to -1 " +
+ "we will just read to the end offset of the input partition (as of the time the program was launched).");
+
+ parser.addArgument("--consumer-group")
+ .action(store())
+ .required(false)
+ .setDefault(-1)
+ .type(String.class)
+ .metavar("CONSUMER-GROUP")
+ .dest("consumerGroup")
+ .help("The consumer group id to use for storing the consumer offsets.");
+
+ parser.addArgument("--transaction-size")
+ .action(store())
+ .required(false)
+ .setDefault(200)
+ .type(Integer.class)
+ .metavar("TRANSACTION-SIZE")
+ .dest("messagesPerTransaction")
+ .help("The number of messages to put in each transaction. Default is 200.");
+
+
+ parser.addArgument("--transactional-id")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .metavar("TRANSACTIONAL-ID")
+ .dest("transactionalId")
+ .help("The transactionalId to assign to the producer");
+
+
+ return parser;
+ }
+
+ private static KafkaProducer<String, String> createProducer(Namespace parsedArgs) {
+ String transactionalId = parsedArgs.getString("transactionalId");
+ String brokerList = parsedArgs.getString("brokerList");
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+
+ return new KafkaProducer<>(props);
+ }
+
+ private static KafkaConsumer<String, String> createConsumer(Namespace parsedArgs, TopicPartition inputPartition) {
+ String consumerGroup = parsedArgs.getString("consumerGroup");
+ String brokerList = parsedArgs.getString("brokerList");
+ Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction");
+
+ Properties props = new Properties();
+
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, numMessagesPerTransaction.toString());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
+ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+
+ return consumer;
+
+ }
+
+ private static ProducerRecord<String, String> producerRecordFromConsumerRecord(String topic, ConsumerRecord<String, String> record) {
+ return new ProducerRecord<>(topic, record.key(), record.value());
+ }
+
+ private static Map<TopicPartition, OffsetAndMetadata> consumerPositions(KafkaConsumer<String, String> consumer) {
+ Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>();
+ for (TopicPartition topicPartition : consumer.assignment()) {
+ positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
+ }
+ return positions;
+ }
+
+ private static long messagesRemaining(KafkaConsumer<String, String> consumer, TopicPartition partition) {
+ long currentPosition = consumer.position(partition);
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Arrays.asList(partition));
+ if (endOffsets.containsKey(partition)) {
+ return endOffsets.get(partition) - currentPosition;
+ }
+ return 0;
+ }
+
+ private static String toJsonString(Map<String, Object> data) {
+ String json;
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ json = mapper.writeValueAsString(data);
+ } catch (JsonProcessingException e) {
+ json = "Bad data can't be written as json: " + e.getMessage();
+ }
+ return json;
+ }
+
+ private static String statusAsJson(long consumed, long remaining, String transactionalId) {
+ Map<String, Object> statusData = new HashMap<>();
+ statusData.put("progress", transactionalId);
+ statusData.put("consumed", consumed);
+ statusData.put("remaining", remaining);
+ return toJsonString(statusData);
+ }
+
+ private static String shutDownString(long consumed, long remaining, String transactionalId) {
+ Map<String, Object> shutdownData = new HashMap<>();
+ shutdownData.put("remaining", remaining);
+ shutdownData.put("consumed", consumed);
+ shutdownData.put("shutdown_complete", transactionalId);
+ return toJsonString(shutdownData);
+ }
+
+ public static void main(String[] args) throws IOException {
+ Namespace parsedArgs = argParser().parseArgsOrFail(args);
+ Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction");
+ final String transactionalId = parsedArgs.getString("transactionalId");
+ final String outputTopic = parsedArgs.getString("outputTopic");
+
+ String consumerGroup = parsedArgs.getString("consumerGroup");
+ TopicPartition inputPartition = new TopicPartition(parsedArgs.getString("inputTopic"), parsedArgs.getInt("inputPartition"));
+
+ final KafkaProducer<String, String> producer = createProducer(parsedArgs);
+ final KafkaConsumer<String, String> consumer = createConsumer(parsedArgs, inputPartition);
+
+ consumer.assign(Arrays.asList(inputPartition));
+
+ long maxMessages = parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : parsedArgs.getInt("maxMessages");
+ maxMessages = Math.min(messagesRemaining(consumer, inputPartition), maxMessages);
+
+ producer.initTransactions();
+
+
+ final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+ final AtomicLong remainingMessages = new AtomicLong(maxMessages);
+ final AtomicLong numMessagesProcessed = new AtomicLong(0);
+ int exitCode = 0;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ isShuttingDown.set(true);
+ // Flush any remaining messages
+ producer.close();
+ synchronized (consumer) {
+ consumer.close();
+ }
+ System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
+ }
+ });
+
+ try {
+ while (0 < remainingMessages.get()) {
+ if ((((double) numMessagesProcessed.get() / maxMessages) * 100) % 10 == 0) {
+ // print status for every 10% we progress.
+ System.out.println(statusAsJson(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
+ }
+ if (isShuttingDown.get())
+ break;
+ int messagesInCurrentTransaction = 0;
+ long numMessagesForNextTransaction = Math.min(numMessagesPerTransaction, remainingMessages.get());
+ producer.beginTransaction();
+
+ while (messagesInCurrentTransaction < numMessagesForNextTransaction) {
+ ConsumerRecords<String, String> records = consumer.poll(200L);
+ for (ConsumerRecord<String, String> record : records) {
+ producer.send(producerRecordFromConsumerRecord(outputTopic, record));
+ messagesInCurrentTransaction++;
+ }
+ }
+ producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup);
+ producer.commitTransaction();
+ remainingMessages.set(maxMessages - numMessagesProcessed.addAndGet(messagesInCurrentTransaction));
+ }
+ } finally {
+ producer.close();
+ synchronized (consumer) {
+ consumer.close();
+ }
+ }
+ System.exit(exitCode);
+ }
+}