You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/23 04:23:44 UTC
kafka git commit: KAFKA-2439;
Add MirrorMaker service class for system tests
Repository: kafka
Updated Branches:
refs/heads/trunk 6acd37720 -> 1d2ae89c5
KAFKA-2439; Add MirrorMaker service class for system tests
Added MirrorMaker service and a few corresponding sanity checks, as well as necessary config template files. A few additional updates to accomodate the change in wait_until from ducktape0.2.0->0.3.0
Author: Geoff Anderson <ge...@confluent.io>
Reviewers: Ewen Cheslack-Postava, Gwen Shapira
Closes #148 from granders/KAFKA-2439 and squashes the following commits:
c7c3ebd [Geoff Anderson] MirrorMaker now can run as multi-node service. Added kill -9 to various clean_node methods.
1e806f2 [Geoff Anderson] Various cleanups per review.
1b4b049 [Geoff Anderson] Added MirrorMaker service and a few corresponding sanity checks, as well as necessary config template files. A few additional updates to accomodate the change in wait_until from ducktape0.2.0->0.3.0
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d2ae89c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d2ae89c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d2ae89c
Branch: refs/heads/trunk
Commit: 1d2ae89c5a1dc5d18b8188bf737a8e1d195be325
Parents: 6acd377
Author: Geoff Anderson <ge...@confluent.io>
Authored: Sat Aug 22 19:23:36 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Sat Aug 22 19:23:36 2015 -0700
----------------------------------------------------------------------
.../sanity_checks/test_console_consumer.py | 12 +-
.../sanity_checks/test_mirror_maker.py | 90 ++++++++++
tests/kafkatest/services/console_consumer.py | 17 +-
tests/kafkatest/services/kafka.py | 1 +
tests/kafkatest/services/mirror_maker.py | 165 +++++++++++++++++++
.../templates/console_consumer.properties | 4 +-
.../templates/console_consumer_log4j.properties | 26 ---
.../services/templates/consumer.properties | 23 +++
.../services/templates/kafka.properties | 80 ---------
.../services/templates/producer.properties | 28 ++++
.../services/templates/tools_log4j.properties | 26 +++
tests/kafkatest/services/verifiable_producer.py | 4 +
tests/kafkatest/services/zookeeper.py | 16 ++
tests/kafkatest/tests/replication_test.py | 8 +-
tests/setup.py | 2 +-
15 files changed, 379 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index cd8c8f9..3e523e1 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -61,20 +61,20 @@ class ConsoleConsumerTest(Test):
self.consumer.start()
node = self.consumer.nodes[0]
- if not wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2):
- raise Exception("Consumer was too slow to start")
+ wait_until(lambda: self.consumer.alive(node),
+ timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
# Verify that log output is happening
- if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10):
- raise Exception("Timed out waiting for log file to exist")
+ wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10,
+ err_msg="Timed out waiting for logging to start.")
assert line_count(node, ConsoleConsumer.LOG_FILE) > 0
# Verify no consumed messages
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
self.consumer.stop_node(node)
- if not wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2):
- raise Exception("Took too long for consumer to die.")
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/sanity_checks/test_mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_mirror_maker.py b/tests/kafkatest/sanity_checks/test_mirror_maker.py
new file mode 100644
index 0000000..3481d7a
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_mirror_maker.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.mirror_maker import MirrorMaker
+
+
+class TestMirrorMakerService(Test):
+ """Sanity checks on mirror maker service class."""
+ def __init__(self, test_context):
+ super(TestMirrorMakerService, self).__init__(test_context)
+
+ self.topic = "topic"
+ self.source_zk = ZookeeperService(test_context, num_nodes=1)
+ self.target_zk = ZookeeperService(test_context, num_nodes=1)
+
+ self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
+ topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+ self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
+ topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+
+ self.num_messages = 1000
+ # This will produce to source kafka cluster
+ self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
+ max_messages=self.num_messages, throughput=1000)
+
+ # Use a regex whitelist to check that the start command is well-formed in this case
+ self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
+ whitelist=".*", consumer_timeout_ms=2000)
+
+ # This will consume from target kafka cluster
+ self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
+ consumer_timeout_ms=1000)
+
+ def setUp(self):
+ # Source cluster
+ self.source_zk.start()
+ self.source_kafka.start()
+
+ # Target cluster
+ self.target_zk.start()
+ self.target_kafka.start()
+
+ def test_end_to_end(self):
+ """
+ Test end-to-end behavior under non-failure conditions.
+
+ Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
+ One is source, and the other is target. Single-node mirror maker mirrors from source to target.
+
+ - Start mirror maker.
+ - Produce a small number of messages to the source cluster.
+ - Consume messages from target.
+ - Verify that number of consumed messages matches the number produced.
+ """
+ self.mirror_maker.start()
+ # Check that consumer_timeout_ms setting made it to config file
+ self.mirror_maker.nodes[0].account.ssh(
+ "grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False)
+
+ self.producer.start()
+ self.producer.wait()
+ self.consumer.start()
+ self.consumer.wait()
+
+ num_consumed = len(self.consumer.messages_consumed[1])
+ num_produced = self.producer.num_acked
+ assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
+ assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)
+
+ self.mirror_maker.stop()
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 18c9f63..ffde6a2 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -14,8 +14,10 @@
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.utils.util import wait_until
import os
+import subprocess
def is_int(msg):
@@ -91,7 +93,7 @@ class ConsoleConsumer(BackgroundThreadService):
"collect_default": True}
}
- def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None):
+ def __init__(self, context, num_nodes, kafka, topic, message_validator=None, from_beginning=True, consumer_timeout_ms=None):
"""
Args:
context: standard context
@@ -141,7 +143,7 @@ class ConsoleConsumer(BackgroundThreadService):
cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
- except:
+ except (subprocess.CalledProcessError, ValueError) as e:
return []
def alive(self, node):
@@ -161,7 +163,7 @@ class ConsoleConsumer(BackgroundThreadService):
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
# Create and upload log properties
- log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
+ log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
# Run and capture output
@@ -169,7 +171,8 @@ class ConsoleConsumer(BackgroundThreadService):
self.logger.debug("Console consumer %d command: %s", idx, cmd)
for line in node.account.ssh_capture(cmd, allow_fail=False):
msg = line.strip()
- msg = self.message_validator(msg)
+ if self.message_validator is not None:
+ msg = self.message_validator(msg)
if msg is not None:
self.logger.debug("consumed a message: " + str(msg))
self.messages_consumed[idx].append(msg)
@@ -179,7 +182,13 @@ class ConsoleConsumer(BackgroundThreadService):
def stop_node(self, node):
node.account.kill_process("java", allow_fail=True)
+ wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2,
+ err_msg="Timed out waiting for consumer to stop.")
def clean_node(self, node):
+ if self.alive(node):
+ self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
+ (self.__class__.__name__, node.account))
+ node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py
index 34ec5ef..76f9cf6 100644
--- a/tests/kafkatest/services/kafka.py
+++ b/tests/kafkatest/services/kafka.py
@@ -93,6 +93,7 @@ class KafkaService(Service):
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
def clean_node(self, node):
+ node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
def create_topic(self, topic_cfg):
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
new file mode 100644
index 0000000..afbed13
--- /dev/null
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -0,0 +1,165 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+
+import os
+import subprocess
+
+"""
+0.8.2.1 MirrorMaker options
+
+Option Description
+------ -----------
+--abort.on.send.failure <Stop the Configure the mirror maker to exit on
+ entire mirror maker when a send a failed send. (default: true)
+ failure occurs>
+--blacklist <Java regex (String)> Blacklist of topics to mirror.
+--consumer.config <config file> Embedded consumer config for consuming
+ from the source cluster.
+--consumer.rebalance.listener <A The consumer rebalance listener to use
+ custom rebalance listener of type for mirror maker consumer.
+ ConsumerRebalanceListener>
+--help Print this message.
+--message.handler <A custom message Message handler which will process
+ handler of type every record in-between consumer and
+ MirrorMakerMessageHandler> producer.
+--message.handler.args <Arguments Arguments used by custom rebalance
+ passed to message handler listener for mirror maker consumer
+ constructor.>
+--num.streams <Integer: Number of Number of consumption streams.
+ threads> (default: 1)
+--offset.commit.interval.ms <Integer: Offset commit interval in ms (default:
+ offset commit interval in 60000)
+ millisecond>
+--producer.config <config file> Embedded producer config.
+--rebalance.listener.args <Arguments Arguments used by custom rebalance
+ passed to custom rebalance listener listener for mirror maker consumer
+ constructor as a string.>
+--whitelist <Java regex (String)> Whitelist of topics to mirror.
+"""
+
+
+class MirrorMaker(Service):
+
+ # Root directory for persistent output
+ PERSISTENT_ROOT = "/mnt/mirror_maker"
+ LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+ LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log")
+ LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+ PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
+ CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties")
+ KAFKA_HOME = "/opt/kafka/"
+
+ logs = {
+ "mirror_maker_log": {
+ "path": LOG_FILE,
+ "collect_default": True}
+ }
+
+ def __init__(self, context, num_nodes, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
+ """
+ MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
+
+ Args:
+ context: standard context
+ source: source Kafka cluster
+ target: target Kafka cluster to which data will be mirrored
+ whitelist: whitelist regex for topics to mirror
+ blacklist: blacklist regex for topics not to mirror
+ num_streams: number of consumer threads to create; can be a single int, or a list with
+ one value per node, allowing num_streams to be the same for each node,
+ or configured independently per-node
+ consumer_timeout_ms: consumer stops if t > consumer_timeout_ms elapses between consecutive messages
+ """
+ super(MirrorMaker, self).__init__(context, num_nodes=num_nodes)
+
+ self.consumer_timeout_ms = consumer_timeout_ms
+ self.num_streams = num_streams
+ if not isinstance(num_streams, int):
+ # if not an integer, num_streams should be configured per-node
+ assert len(num_streams) == num_nodes
+ self.whitelist = whitelist
+ self.blacklist = blacklist
+ self.source = source
+ self.target = target
+
+ def start_cmd(self, node):
+ cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
+ cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
+ cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
+ cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
+ cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
+ if isinstance(self.num_streams, int):
+ cmd += " --num.streams %d" % self.num_streams
+ else:
+ # config num_streams separately on each node
+ cmd += " --num.streams %d" % self.num_streams[self.idx(node) - 1]
+ if self.whitelist is not None:
+ cmd += " --whitelist=\"%s\"" % self.whitelist
+ if self.blacklist is not None:
+ cmd += " --blacklist=\"%s\"" % self.blacklist
+ cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
+ return cmd
+
+ def pids(self, node):
+ try:
+ cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
+ pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+ return pid_arr
+ except (subprocess.CalledProcessError, ValueError) as e:
+ return []
+
+ def alive(self, node):
+ return len(self.pids(node)) > 0
+
+ def start_node(self, node):
+ node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
+ node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
+
+ # Create, upload one consumer config file for source cluster
+ consumer_props = self.render('consumer.properties', zookeeper_connect=self.source.zk.connect_setting())
+ node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props)
+
+ # Create, upload producer properties file for target cluster
+ producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(),
+ producer_type="async")
+ node.account.create_file(MirrorMaker.PRODUCER_CONFIG, producer_props)
+
+ # Create and upload log properties
+ log_config = self.render('tools_log4j.properties', log_file=MirrorMaker.LOG_FILE)
+ node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config)
+
+ # Run mirror maker
+ cmd = self.start_cmd(node)
+ self.logger.debug("Mirror maker command: %s", cmd)
+ node.account.ssh(cmd, allow_fail=False)
+ wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5,
+ err_msg="Mirror maker took to long to start.")
+ self.logger.debug("Mirror maker is alive")
+
+ def stop_node(self, node):
+ node.account.kill_process("java", allow_fail=True)
+ wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.5,
+ err_msg="Mirror maker took to long to stop.")
+
+ def clean_node(self, node):
+ if self.alive(node):
+ self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
+ (self.__class__.__name__, node.account))
+ node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
+ node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/console_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties
index 944c2c9..7143179 100644
--- a/tests/kafkatest/services/templates/console_consumer.properties
+++ b/tests/kafkatest/services/templates/console_consumer.properties
@@ -14,6 +14,6 @@
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
-{% if consumer_timeout_ms is not none %}
+{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
-{% endif %}
\ No newline at end of file
+{% endif %}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/console_consumer_log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer_log4j.properties b/tests/kafkatest/services/templates/console_consumer_log4j.properties
deleted file mode 100644
index e63e6d6..0000000
--- a/tests/kafkatest/services/templates/console_consumer_log4j.properties
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Define the root logger with appender file
-log4j.rootLogger = INFO, FILE
-
-log4j.appender.FILE=org.apache.log4j.FileAppender
-log4j.appender.FILE.File={{ log_file }}
-log4j.appender.FILE.ImmediateFlush=true
-log4j.appender.FILE.Threshold=debug
-# Set the append to false, overwrite
-log4j.appender.FILE.Append=false
-log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/consumer.properties b/tests/kafkatest/services/templates/consumer.properties
new file mode 100644
index 0000000..b8723d1
--- /dev/null
+++ b/tests/kafkatest/services/templates/consumer.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+zookeeper.connect={{ zookeeper_connect }}
+zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
+group.id={{ group_id|default('test-consumer-group') }}
+
+{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
+consumer.timeout.ms={{ consumer_timeout_ms }}
+{% endif %}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties
index db1077a..6650d23 100644
--- a/tests/kafkatest/services/templates/kafka.properties
+++ b/tests/kafkatest/services/templates/kafka.properties
@@ -14,108 +14,28 @@
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
-############################# Server Basics #############################
-# The id of the broker. This must be set to a unique integer for each broker.
broker.id={{ broker_id }}
-
-############################# Socket Server Settings #############################
-
-# The port the socket server listens on
port=9092
-
-# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
-
-# Hostname the broker will advertise to producers and consumers. If not set, it uses the
-# value for "host.name" if configured. Otherwise, it will use the value returned from
-# java.net.InetAddress.getCanonicalHostName().
advertised.host.name={{ node.account.hostname }}
-
-# The port to publish to ZooKeeper for clients to use. If this is not set,
-# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
-
-# The number of threads handling network requests
num.network.threads=3
-
-# The number of threads doing disk I/O
num.io.threads=8
-
-# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
-
-# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=65536
-
-# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
-
-############################# Log Basics #############################
-
-# A comma seperated list of directories under which to store log files
log.dirs=/mnt/kafka-logs
-
-# The default number of log partitions per topic. More partitions allow greater
-# parallelism for consumption, but this will also result in more files across
-# the brokers.
num.partitions=1
-
-# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
-# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
-
-############################# Log Flush Policy #############################
-
-# Messages are immediately written to the filesystem but by default we only fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data to disk.
-# There are a few important trade-offs here:
-# 1. Durability: Unflushed data may be lost if you are not using replication.
-# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
-# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
-# The settings below allow one to configure the flush policy to flush data after a period of time or
-# every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
-# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
-
-# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
-
-############################# Log Retention Policy #############################
-
-# The following configurations control the disposal of log segments. The policy can
-# be set to delete segments after a period of time, or after a given size has accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
-# from the end of the log.
-
-# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
-
-# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
-
-# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
-
-# The interval at which log segments are checked to see if they can be deleted according
-# to the retention policies
log.retention.check.interval.ms=300000
-
-# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
-# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
-############################# Zookeeper #############################
-
-# Zookeeper connection string (see zookeeper docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
zookeeper.connect={{ zk.connect_setting() }}
-
-# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=2000
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/producer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/producer.properties b/tests/kafkatest/services/templates/producer.properties
new file mode 100644
index 0000000..ede60c8
--- /dev/null
+++ b/tests/kafkatest/services/templates/producer.properties
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.producer.ProducerConfig for more details
+
+metadata.broker.list={{ broker_list }}
+bootstrap.servers = {{ broker_list }}
+producer.type={{ producer_type }} # sync or async
+compression.codec=none
+serializer.class=kafka.serializer.DefaultEncoder
+
+#partitioner.class=
+#compressed.topics=
+#queue.buffering.max.ms=
+#queue.buffering.max.messages=
+#queue.enqueue.timeout.ms=
+#batch.num.messages=
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/tools_log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties
new file mode 100644
index 0000000..e63e6d6
--- /dev/null
+++ b/tests/kafkatest/services/templates/tools_log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define the root logger with appender file
+log4j.rootLogger = INFO, FILE
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.File={{ log_file }}
+log4j.appender.FILE.ImmediateFlush=true
+log4j.appender.FILE.Threshold=debug
+# Set the append to false, overwrite
+log4j.appender.FILE.Append=false
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index cca8227..158db7a 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -89,12 +89,16 @@ class VerifiableProducer(BackgroundThreadService):
def stop_node(self, node):
node.account.kill_process("VerifiableProducer", allow_fail=False)
+ if self.worker_threads is None:
+ return
+
# block until the corresponding thread exits
if len(self.worker_threads) >= self.idx(node):
# Need to guard this because stop is preemptively called before the worker threads are added and started
self.worker_threads[self.idx(node) - 1].join()
def clean_node(self, node):
+ node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
def try_parse_json(self, string):
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index 56f4606..09bec35 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -16,6 +16,7 @@
from ducktape.services.service import Service
+import subprocess
import time
@@ -51,6 +52,17 @@ class ZookeeperService(Service):
time.sleep(5) # give it some time to start
+ def pids(self, node):
+ try:
+ cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'"
+ pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+ return pid_arr
+ except (subprocess.CalledProcessError, ValueError) as e:
+ return []
+
+ def alive(self, node):
+ return len(self.pids(node)) > 0
+
def stop_node(self, node):
idx = self.idx(node)
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
@@ -58,6 +70,10 @@ class ZookeeperService(Service):
def clean_node(self, node):
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
+ if self.alive(node):
+ self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
+ (self.__class__.__name__, node.account))
+ node.account.kill_process("zookeeper", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
def connect_setting(self):
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index fed1ea1..755fb42 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -19,7 +19,7 @@ from ducktape.utils.util import wait_until
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
import signal
import time
@@ -76,12 +76,12 @@ class ReplicationTest(Test):
"""
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000, message_validator=is_int)
# Produce in a background thread while driving broker failures
self.producer.start()
- if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5):
- raise RuntimeError("Producer failed to start in a reasonable amount of time.")
+ wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
+ err_msg="Producer failed to start in a reasonable amount of time.")
failure()
self.producer.stop()
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
index 5ce4bb7..a2fa71a 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -23,5 +23,5 @@ setup(name="kafkatest",
platforms=["any"],
license="apache2.0",
packages=find_packages(),
- requires=["ducktape(>=0.2.0)"]
+ requires=["ducktape(==0.3.0)"]
)