You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/13 22:51:12 UTC
kafka git commit: KAFKA-3490;
Multiple version support for ducktape performance tests
Repository: kafka
Updated Branches:
refs/heads/trunk 667ff7ef7 -> c1694833d
KAFKA-3490; Multiple version support for ducktape performance tests
Author: Ismael Juma <is...@juma.me.uk>
Author: Geoff Anderson <ge...@confluent.io>
Reviewers: Geoff Anderson <ge...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1173 from ijuma/kafka-3490-multiple-version-support-perf-tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1694833
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1694833
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1694833
Branch: refs/heads/trunk
Commit: c1694833d5c095e47e5767f38c3e85bbe927a0a7
Parents: 667ff7e
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Apr 13 13:50:49 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Apr 13 13:50:49 2016 -0700
----------------------------------------------------------------------
.../kafkatest/benchmarks/core/benchmark_test.py | 85 +++++-----
.../sanity_checks/test_performance_services.py | 88 ++++++++++
.../kafkatest/services/performance/__init__.py | 4 +-
.../performance/consumer_performance.py | 45 ++++--
.../services/performance/end_to_end_latency.py | 62 ++++++--
.../services/performance/performance.py | 23 +++
.../performance/producer_performance.py | 159 ++++++++++++++-----
.../services/templates/tools_log4j.properties | 2 +-
vagrant/base.sh | 2 +
9 files changed, 364 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/benchmarks/core/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 9c2e32d..d252e5d 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -20,7 +20,8 @@ from ducktape.mark import matrix
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
-from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
+from kafkatest.services.kafka.version import TRUNK, KafkaVersion
+from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, compute_aggregate_throughput
TOPIC_REP_ONE = "topic-replication-factor-one"
@@ -54,11 +55,12 @@ class Benchmark(Test):
def setUp(self):
self.zk.start()
- def start_kafka(self, security_protocol, interbroker_security_protocol):
+ def start_kafka(self, security_protocol, interbroker_security_protocol, version):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
self.zk, security_protocol=security_protocol,
- interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+ interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
+ version=version)
self.kafka.log_level = "INFO" # We don't DEBUG logging here
self.kafka.start()
@@ -67,7 +69,8 @@ class Benchmark(Test):
@parametrize(acks=-1, topic=TOPIC_REP_THREE)
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
- def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'):
+ def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT',
+ client_version=str(TRUNK), broker_version=str(TRUNK)):
"""
Setup: 1 node zk + 3 node kafka cluster
Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
@@ -76,13 +79,16 @@ class Benchmark(Test):
Collect and return aggregate throughput statistics after all messages have been acknowledged.
(This runs ProducerPerformance.java under the hood)
"""
- self.start_kafka(security_protocol, security_protocol)
+ client_version = KafkaVersion(client_version)
+ broker_version = KafkaVersion(broker_version)
+ self.validate_versions(client_version, broker_version)
+ self.start_kafka(security_protocol, security_protocol, broker_version)
# Always generate the same total amount of data
nrecords = int(self.target_data_size / message_size)
self.producer = ProducerPerformanceService(
self.test_context, num_producers, self.kafka, topic=topic,
- num_records=nrecords, record_size=message_size, throughput=-1,
+ num_records=nrecords, record_size=message_size, throughput=-1, version=client_version,
settings={
'acks': acks,
'batch.size': self.batch_size,
@@ -92,7 +98,8 @@ class Benchmark(Test):
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
- def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None):
+ def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None,
+ client_version=str(TRUNK), broker_version=str(TRUNK)):
"""
Setup: 1 node zk + 3 node kafka cluster
Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
@@ -101,13 +108,16 @@ class Benchmark(Test):
(This runs ProducerPerformance.java under the hood)
"""
+ client_version = KafkaVersion(client_version)
+ broker_version = KafkaVersion(broker_version)
+ self.validate_versions(client_version, broker_version)
if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
- self.start_kafka(security_protocol, interbroker_security_protocol)
+ self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
- throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
+ throughput=-1, version=client_version, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
intermediate_stats=True
)
self.producer.run()
@@ -135,10 +145,10 @@ class Benchmark(Test):
self.logger.info("\n".join(summary))
return data
-
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
- def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None):
+ def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None,
+ client_version=str(TRUNK), broker_version=str(TRUNK)):
"""
Setup: 1 node zk + 3 node kafka cluster
Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
@@ -148,13 +158,16 @@ class Benchmark(Test):
(Under the hood, this simply runs EndToEndLatency.scala)
"""
+ client_version = KafkaVersion(client_version)
+ broker_version = KafkaVersion(broker_version)
+ self.validate_versions(client_version, broker_version)
if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
- self.start_kafka(security_protocol, interbroker_security_protocol)
+ self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
self.logger.info("BENCHMARK: End to end latency")
self.perf = EndToEndLatencyService(
self.test_context, 1, self.kafka,
- topic=TOPIC_REP_THREE, num_records=10000
+ topic=TOPIC_REP_THREE, num_records=10000, version=client_version
)
self.perf.run()
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
@@ -162,7 +175,8 @@ class Benchmark(Test):
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
- def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True):
+ def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True,
+ client_version=str(TRUNK), broker_version=str(TRUNK)):
"""
Setup: 1 node zk + 3 node kafka cluster
Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
@@ -172,15 +186,18 @@ class Benchmark(Test):
(Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
"""
+ client_version = KafkaVersion(client_version)
+ broker_version = KafkaVersion(broker_version)
+ self.validate_versions(client_version, broker_version)
if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
- self.start_kafka(security_protocol, interbroker_security_protocol)
+ self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
num_records = 10 * 1000 * 1000 # 10e6
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic=TOPIC_REP_THREE,
- num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+ num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version,
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
)
self.consumer = ConsumerPerformanceService(
@@ -200,21 +217,25 @@ class Benchmark(Test):
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
- def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1):
+ def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1,
+ client_version=str(TRUNK), broker_version=str(TRUNK)):
"""
Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
(using new consumer iff new_consumer == True), and report throughput.
"""
+ client_version = KafkaVersion(client_version)
+ broker_version = KafkaVersion(broker_version)
+ self.validate_versions(client_version, broker_version)
if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
- self.start_kafka(security_protocol, interbroker_security_protocol)
+ self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
num_records = 10 * 1000 * 1000 # 10e6
# seed kafka w/messages
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic=TOPIC_REP_THREE,
- num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+ num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version,
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
)
self.producer.run()
@@ -227,27 +248,5 @@ class Benchmark(Test):
self.consumer.run()
return compute_aggregate_throughput(self.consumer)
-
-def throughput(records_per_sec, mb_per_sec):
- """Helper method to ensure uniform representation of throughput data"""
- return {
- "records_per_sec": records_per_sec,
- "mb_per_sec": mb_per_sec
- }
-
-
-def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
- """Helper method to ensure uniform representation of latency data"""
- return {
- "latency_50th_ms": latency_50th_ms,
- "latency_99th_ms": latency_99th_ms,
- "latency_999th_ms": latency_999th_ms
- }
-
-
-def compute_aggregate_throughput(perf):
- """Helper method for computing throughput after running a performance service."""
- aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
- aggregate_mbps = sum([r['mbps'] for r in perf.results])
-
- return throughput(aggregate_rate, aggregate_mbps)
+ def validate_versions(self, client_version, broker_version):
+ assert client_version <= broker_version, "Client version %s should be <= than broker version %s" (client_version, broker_version)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/sanity_checks/test_performance_services.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py
new file mode 100644
index 0000000..16d5d32
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_performance_services.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2, LATEST_0_9, KafkaVersion
+from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
+from kafkatest.services.performance import latency, compute_aggregate_throughput
+
+
+class PerformanceServiceTest(Test):
+ def __init__(self, test_context):
+ super(PerformanceServiceTest, self).__init__(test_context)
+ self.record_size = 100
+ self.num_records = 10000
+ self.topic = "topic"
+
+ self.zk = ZookeeperService(test_context, 1)
+
+ def setUp(self):
+ self.zk.start()
+
+ # We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
+ # the overhead should be manageable.
+ @parametrize(version=str(LATEST_0_8_2))
+ @parametrize(version=str(LATEST_0_9), new_consumer=False)
+ @parametrize(version=str(LATEST_0_9), new_consumer=True)
+ @parametrize(version=str(TRUNK), new_consumer=False)
+ @parametrize(version=str(TRUNK), new_consumer=True)
+ def test_version(self, version=str(LATEST_0_9), new_consumer=False):
+ """
+ Sanity check out producer performance service - verify that we can run the service with a small
+ number of messages. The actual stats here are pretty meaningless since the number of messages is quite small.
+ """
+ version = KafkaVersion(version)
+ self.kafka = KafkaService(
+ self.test_context, 1,
+ self.zk, topics={self.topic: {'partitions': 1, 'replication-factor': 1}}, version=version)
+ self.kafka.start()
+
+ # check basic run of producer performance
+ self.producer_perf = ProducerPerformanceService(
+ self.test_context, 1, self.kafka, topic=self.topic,
+ num_records=self.num_records, record_size=self.record_size,
+ throughput=1000000000, # Set impossibly for no throttling for equivalent behavior between 0.8.X and 0.9.X
+ version=version,
+ settings={
+ 'acks': 1,
+ 'batch.size': 8*1024,
+ 'buffer.memory': 64*1024*1024})
+ self.producer_perf.run()
+ producer_perf_data = compute_aggregate_throughput(self.producer_perf)
+
+ # check basic run of end to end latency
+ self.end_to_end = EndToEndLatencyService(
+ self.test_context, 1, self.kafka,
+ topic=self.topic, num_records=self.num_records, version=version)
+ self.end_to_end.run()
+ end_to_end_data = latency(self.end_to_end.results[0]['latency_50th_ms'], self.end_to_end.results[0]['latency_99th_ms'], self.end_to_end.results[0]['latency_999th_ms'])
+
+ # check basic run of consumer performance service
+ self.consumer_perf = ConsumerPerformanceService(
+ self.test_context, 1, self.kafka, new_consumer=new_consumer,
+ topic=self.topic, version=version, messages=self.num_records)
+ self.consumer_perf.group = "test-consumer-group"
+ self.consumer_perf.run()
+ consumer_perf_data = compute_aggregate_throughput(self.consumer_perf)
+
+ return {
+ "producer_performance": producer_perf_data,
+ "end_to_end_latency": end_to_end_data,
+ "consumer_performance": consumer_perf_data
+ }
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/__init__.py b/tests/kafkatest/services/performance/__init__.py
index a72e3b7..9eddcaa 100644
--- a/tests/kafkatest/services/performance/__init__.py
+++ b/tests/kafkatest/services/performance/__init__.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from performance import PerformanceService
+from performance import PerformanceService, throughput, latency, compute_aggregate_throughput
from end_to_end_latency import EndToEndLatencyService
from producer_performance import ProducerPerformanceService
-from consumer_performance import ConsumerPerformanceService
\ No newline at end of file
+from consumer_performance import ConsumerPerformanceService
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index f8289bc..def27b1 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -14,8 +14,9 @@
# limitations under the License.
from kafkatest.services.performance import PerformanceService
-from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
import os
@@ -69,7 +70,7 @@ class ConsumerPerformanceService(PerformanceService):
"collect_default": True}
}
- def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}):
+ def __init__(self, context, num_nodes, kafka, topic, messages, version=TRUNK, new_consumer=False, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
@@ -78,6 +79,13 @@ class ConsumerPerformanceService(PerformanceService):
self.new_consumer = new_consumer
self.settings = settings
+ assert version >= V_0_9_0_0 or (not new_consumer), \
+ "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version)
+
+ security_protocol = self.security_config.security_protocol
+ assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
+ "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
+
# These less-frequently used settings can be updated manually after instantiation
self.fetch_size = None
self.socket_buffer_size = None
@@ -86,6 +94,9 @@ class ConsumerPerformanceService(PerformanceService):
self.group = None
self.from_latest = None
+ for node in self.nodes:
+ node.version = version
+
@property
def args(self):
"""Dictionary of arguments used to start the Consumer Performance script."""
@@ -127,7 +138,10 @@ class ConsumerPerformanceService(PerformanceService):
cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node)
for key, value in self.args.items():
cmd += " --%s %s" % (key, value)
- cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
+
+ if node.version >= V_0_9_0_0:
+ # This is only used for security settings
+ cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
@@ -136,6 +150,22 @@ class ConsumerPerformanceService(PerformanceService):
'stderr': ConsumerPerformanceService.STDERR_CAPTURE}
return cmd
+ def parse_results(self, line, version):
+ parts = line.split(',')
+ if version >= V_0_9_0_0:
+ result = {
+ 'total_mb': float(parts[2]),
+ 'mbps': float(parts[3]),
+ 'records_per_sec': float(parts[5]),
+ }
+ else:
+ result = {
+ 'total_mb': float(parts[3]),
+ 'mbps': float(parts[4]),
+ 'records_per_sec': float(parts[6]),
+ }
+ return result
+
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
@@ -149,11 +179,6 @@ class ConsumerPerformanceService(PerformanceService):
last = None
for line in node.account.ssh_capture(cmd):
last = line
- # Parse and save the last line's information
- parts = last.split(',')
- self.results[idx-1] = {
- 'total_mb': float(parts[2]),
- 'mbps': float(parts[3]),
- 'records_per_sec': float(parts[5]),
- }
+ # Parse and save the last line's information
+ self.results[idx-1] = self.parse_results(last, node.version)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 049eebc..08eff70 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -17,9 +17,11 @@ from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
class EndToEndLatencyService(PerformanceService):
+ MESSAGE_BYTES = 21 # 0.8.X messages are fixed at 21 bytes, so we'll match that for other versions
logs = {
"end_to_end_latency_log": {
@@ -27,37 +29,79 @@ class EndToEndLatencyService(PerformanceService):
"collect_default": True},
}
- def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
+ def __init__(self, context, num_nodes, kafka, topic, num_records, version=TRUNK, consumer_fetch_max_wait=100, acks=1):
super(EndToEndLatencyService, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
+
+ security_protocol = self.security_config.security_protocol
+ assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
+ "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
+
self.args = {
'topic': topic,
'num_records': num_records,
'consumer_fetch_max_wait': consumer_fetch_max_wait,
'acks': acks,
- 'kafka_opts': self.security_config.kafka_opts
+ 'kafka_opts': self.security_config.kafka_opts,
+ 'message_bytes': EndToEndLatencyService.MESSAGE_BYTES
}
- def _worker(self, idx, node):
- args = self.args.copy()
- self.security_config.setup_node(node)
+ for node in self.nodes:
+ node.version = version
+
+ @property
+ def security_config_file(self):
if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
security_config_file = SecurityConfig.CONFIG_DIR + "/security.properties"
- node.account.create_file(security_config_file, str(self.security_config))
else:
security_config_file = ""
+ return security_config_file
+
+ def start_cmd(self, node):
+ args = self.args.copy()
args.update({
'zk_connect': self.kafka.zk.connect_setting(),
'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
- 'security_config_file': security_config_file,
+ 'security_config_file': self.security_config_file,
'kafka_dir': kafka_dir(node)
})
- cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
- cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(security_config_file)s" % args
+ if node.version >= V_0_9_0_0:
+ """
+ val brokerList = args(0)
+ val topic = args(1)
+ val numMessages = args(2).toInt
+ val producerAcks = args(3)
+ val messageLen = args(4).toInt
+ """
+
+ cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
+ cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(security_config_file)s" % args
+ else:
+ """
+ val brokerList = args(0)
+ val zkConnect = args(1)
+ val topic = args(2)
+ val numMessages = args(3).toInt
+ val consumerFetchMaxWait = args(4).toInt
+ val producerAcks = args(5).toInt
+ """
+
+ # Set fetch max wait to 0 to match behavior in later versions
+ cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % args
+ cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d 0 %(acks)d" % args
+
cmd += " | tee /mnt/end-to-end-latency.log"
+ return cmd
+
+ def _worker(self, idx, node):
+ self.security_config.setup_node(node)
+ if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
+ node.account.create_file(self.security_config_file, str(self.security_config))
+
+ cmd = self.start_cmd(node)
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
results = {}
for line in node.account.ssh_capture(cmd):
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py
index 6d286f6..1eab197 100644
--- a/tests/kafkatest/services/performance/performance.py
+++ b/tests/kafkatest/services/performance/performance.py
@@ -27,3 +27,26 @@ class PerformanceService(BackgroundThreadService):
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/*", allow_fail=False)
+def throughput(records_per_sec, mb_per_sec):
+ """Helper method to ensure uniform representation of throughput data"""
+ return {
+ "records_per_sec": records_per_sec,
+ "mb_per_sec": mb_per_sec
+ }
+
+
+def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
+ """Helper method to ensure uniform representation of latency data"""
+ return {
+ "latency_50th_ms": latency_50th_ms,
+ "latency_99th_ms": latency_99th_ms,
+ "latency_999th_ms": latency_999th_ms
+ }
+
+
+def compute_aggregate_throughput(perf):
+ """Helper method for computing throughput after running a performance service."""
+ aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
+ aggregate_mbps = sum([r['mbps'] for r in perf.results])
+
+ return throughput(aggregate_rate, aggregate_mbps)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 7cbc7bb..f4887ed 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -13,26 +13,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ducktape.utils.util import wait_until
+
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.performance import PerformanceService
-import itertools
from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
+from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
+
+import os
+import subprocess
+
class ProducerPerformanceService(JmxMixin, PerformanceService):
- logs = {
- "producer_performance_log": {
- "path": "/mnt/producer-performance.log",
- "collect_default": True},
- }
+ PERSISTENT_ROOT = "/mnt/producer_performance"
+ STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stdout")
+ STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stderr")
+ LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+ LOG_FILE = os.path.join(LOG_DIR, "producer_performance.log")
+ LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
- def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={},
+ def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=TRUNK, settings={},
intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=[]):
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
PerformanceService.__init__(self, context, num_nodes)
+
+ self.logs = {
+ "producer_performance_stdout": {
+ "path": ProducerPerformanceService.STDOUT_CAPTURE,
+ "collect_default": True},
+ "producer_performance_stderr": {
+ "path": ProducerPerformanceService.STDERR_CAPTURE,
+ "collect_default": True},
+ "producer_performance_log": {
+ "path": ProducerPerformanceService.LOG_FILE,
+ "collect_default": True},
+ "jmx_log": {
+ "path": "/mnt/jmx_tool.log",
+ "collect_default": jmx_object_names is not None
+ }
+
+ }
+
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
+
+ security_protocol = self.security_config.security_protocol
+ assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
+ "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
+
self.args = {
'topic': topic,
'kafka_opts': self.security_config.kafka_opts,
@@ -44,7 +74,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
self.intermediate_stats = intermediate_stats
self.client_id = client_id
- def _worker(self, idx, node):
+ for node in self.nodes:
+ node.version = version
+
+ def start_cmd(self, node):
args = self.args.copy()
args.update({
'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
@@ -52,48 +85,92 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
'client_id': self.client_id,
'kafka_directory': kafka_dir(node)
})
- cmd = "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
+
+ cmd = ""
+
+ if node.version < TRUNK:
+ # In order to ensure more consistent configuration between versions, always use the ProducerPerformance
+ # tool from trunk
+ cmd += "for file in /opt/%s/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+ cmd += "for file in /opt/%s/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+ cmd += "export CLASSPATH; "
+
+ cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % ProducerPerformanceService.LOG4J_CONFIG
+ cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
"--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
self.security_config.setup_node(node)
if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
self.settings.update(self.security_config.properties)
+
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
- cmd += " | tee /mnt/producer-performance.log"
+ cmd += " 2>>%s | tee %s" % (ProducerPerformanceService.STDERR_CAPTURE, ProducerPerformanceService.STDOUT_CAPTURE)
+ return cmd
+
+ def pids(self, node):
+ try:
+ cmd = "jps | grep -i ProducerPerformance | awk '{print $1}'"
+ pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+ return pid_arr
+ except (subprocess.CalledProcessError, ValueError) as e:
+ return []
+
+ def alive(self, node):
+ return len(self.pids(node)) > 0
+
+ def _worker(self, idx, node):
+
+ node.account.ssh("mkdir -p %s" % ProducerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
+
+ # Create and upload log properties
+ log_config = self.render('tools_log4j.properties', log_file=ProducerPerformanceService.LOG_FILE)
+ node.account.create_file(ProducerPerformanceService.LOG4J_CONFIG, log_config)
+
+ cmd = self.start_cmd(node)
self.logger.debug("Producer performance %d command: %s", idx, cmd)
- def parse_stats(line):
- parts = line.split(',')
- return {
- 'records': int(parts[0].split()[0]),
- 'records_per_sec': float(parts[1].split()[0]),
- 'mbps': float(parts[1].split('(')[1].split()[0]),
- 'latency_avg_ms': float(parts[2].split()[0]),
- 'latency_max_ms': float(parts[3].split()[0]),
- 'latency_50th_ms': float(parts[4].split()[0]),
- 'latency_95th_ms': float(parts[5].split()[0]),
- 'latency_99th_ms': float(parts[6].split()[0]),
- 'latency_999th_ms': float(parts[7].split()[0]),
- }
- last = None
+ # start ProducerPerformance process
producer_output = node.account.ssh_capture(cmd)
+ wait_until(lambda: self.alive(node), timeout_sec=20, err_msg="ProducerPerformance failed to start")
+ # block until there is at least one line of output
first_line = next(producer_output, None)
+ if first_line is None:
+ raise Exception("No output from ProducerPerformance")
+
+ self.start_jmx_tool(idx, node)
+ wait_until(lambda: not self.alive(node), timeout_sec=1200, err_msg="ProducerPerformance failed to finish")
+ self.read_jmx_output(idx, node)
+
+ # parse producer output from file
+ last = None
+ producer_output = node.account.ssh_capture("cat %s" % ProducerPerformanceService.STDOUT_CAPTURE)
+ for line in producer_output:
+ if self.intermediate_stats:
+ try:
+ self.stats[idx-1].append(self.parse_stats(line))
+ except:
+ # Sometimes there are extraneous log messages
+ pass
- if first_line is not None:
- self.start_jmx_tool(idx, node)
- for line in itertools.chain([first_line], producer_output):
- if self.intermediate_stats:
- try:
- self.stats[idx-1].append(parse_stats(line))
- except:
- # Sometimes there are extraneous log messages
- pass
-
- last = line
- try:
- self.results[idx-1] = parse_stats(last)
- except:
- raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
- self.read_jmx_output(idx, node)
+ last = line
+ try:
+ self.results[idx-1] = self.parse_stats(last)
+ except:
+ raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
+
+ def parse_stats(self, line):
+
+ parts = line.split(',')
+ return {
+ 'records': int(parts[0].split()[0]),
+ 'records_per_sec': float(parts[1].split()[0]),
+ 'mbps': float(parts[1].split('(')[1].split()[0]),
+ 'latency_avg_ms': float(parts[2].split()[0]),
+ 'latency_max_ms': float(parts[3].split()[0]),
+ 'latency_50th_ms': float(parts[4].split()[0]),
+ 'latency_95th_ms': float(parts[5].split()[0]),
+ 'latency_99th_ms': float(parts[6].split()[0]),
+ 'latency_999th_ms': float(parts[7].split()[0]),
+ }
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/templates/tools_log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties
index 6fec1d6..55ae4e0 100644
--- a/tests/kafkatest/services/templates/tools_log4j.properties
+++ b/tests/kafkatest/services/templates/tools_log4j.properties
@@ -22,4 +22,4 @@ log4j.appender.FILE.ImmediateFlush=true
# Set the append to true
log4j.appender.FILE.Append=true
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
\ No newline at end of file
+log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index d271f87..da7737c 100644
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -63,7 +63,9 @@ get_kafka() {
}
get_kafka 0.8.2.2
+chmod a+rw /opt/kafka-0.8.2.2
get_kafka 0.9.0.1
+chmod a+rw /opt/kafka-0.9.0.1
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
# VMs, we can just create it if it doesn't exist and use it like we'd use