You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:42 UTC
[26/50] [abbrv] kafka git commit: KAFKA-3483: Restructure ducktape
tests to simplify running subsets of tests
KAFKA-3483: Restructure ducktape tests to simplify running subsets of tests
… tests
Author: Grant Henke <gr...@gmail.com>
Reviewers: Geoff Anderson <ge...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1162 from granthenke/ducktape-structure
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45c585b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45c585b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45c585b4
Branch: refs/heads/0.10.0
Commit: 45c585b4f7e3d5e5dd5297b4d121badbd2052922
Parents: 83cf385
Author: Grant Henke <gr...@gmail.com>
Authored: Sun Apr 3 20:04:36 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sun Apr 3 20:04:36 2016 -0700
----------------------------------------------------------------------
tests/kafkatest/benchmarks/__init__.py | 14 +
tests/kafkatest/benchmarks/core/__init__.py | 15 +
.../kafkatest/benchmarks/core/benchmark_test.py | 253 ++++++++++++++++
tests/kafkatest/tests/benchmark_test.py | 253 ----------------
tests/kafkatest/tests/client/__init__.py | 15 +
.../kafkatest/tests/client/compression_test.py | 85 ++++++
.../client/consumer_rolling_upgrade_test.py | 82 ++++++
tests/kafkatest/tests/client/consumer_test.py | 291 +++++++++++++++++++
.../tests/client/message_format_change_test.py | 92 ++++++
tests/kafkatest/tests/client/quota_test.py | 170 +++++++++++
.../tests/compatibility_test_new_broker_test.py | 78 -----
tests/kafkatest/tests/compression_test.py | 85 ------
tests/kafkatest/tests/connect/__init__.py | 15 +
.../tests/connect/connect_distributed_test.py | 210 +++++++++++++
.../tests/connect/connect_rest_test.py | 165 +++++++++++
tests/kafkatest/tests/connect/connect_test.py | 93 ++++++
.../templates/connect-distributed.properties | 46 +++
.../templates/connect-file-sink.properties | 20 ++
.../templates/connect-file-source.properties | 20 ++
.../templates/connect-standalone.properties | 32 ++
.../kafkatest/tests/connect_distributed_test.py | 210 -------------
tests/kafkatest/tests/connect_rest_test.py | 165 -----------
tests/kafkatest/tests/connect_test.py | 93 ------
.../tests/consumer_group_command_test.py | 106 -------
.../tests/consumer_rolling_upgrade_test.py | 82 ------
tests/kafkatest/tests/consumer_test.py | 291 -------------------
tests/kafkatest/tests/core/__init__.py | 15 +
.../core/compatibility_test_new_broker_test.py | 78 +++++
.../tests/core/consumer_group_command_test.py | 106 +++++++
.../tests/core/get_offset_shell_test.py | 91 ++++++
tests/kafkatest/tests/core/mirror_maker_test.py | 179 ++++++++++++
.../tests/core/reassign_partitions_test.py | 110 +++++++
tests/kafkatest/tests/core/replication_test.py | 152 ++++++++++
.../tests/core/security_rolling_upgrade_test.py | 128 ++++++++
.../tests/core/simple_consumer_shell_test.py | 75 +++++
tests/kafkatest/tests/core/upgrade_test.py | 105 +++++++
.../core/zookeeper_security_upgrade_test.py | 117 ++++++++
tests/kafkatest/tests/get_offset_shell_test.py | 91 ------
tests/kafkatest/tests/log4j_appender_test.py | 93 ------
.../tests/message_format_change_test.py | 92 ------
tests/kafkatest/tests/mirror_maker_test.py | 179 ------------
tests/kafkatest/tests/quota_test.py | 170 -----------
.../kafkatest/tests/reassign_partitions_test.py | 110 -------
tests/kafkatest/tests/replication_test.py | 152 ----------
.../tests/security_rolling_upgrade_test.py | 128 --------
.../tests/simple_consumer_shell_test.py | 75 -----
tests/kafkatest/tests/streams/__init__.py | 15 +
.../tests/streams/streams_bounce_test.py | 73 +++++
.../tests/streams/streams_smoke_test.py | 75 +++++
tests/kafkatest/tests/streams_bounce_test.py | 73 -----
tests/kafkatest/tests/streams_smoke_test.py | 75 -----
.../templates/connect-distributed.properties | 46 ---
.../templates/connect-file-sink.properties | 20 --
.../templates/connect-file-source.properties | 20 --
.../templates/connect-standalone.properties | 32 --
tests/kafkatest/tests/tools/__init__.py | 15 +
.../tests/tools/log4j_appender_test.py | 93 ++++++
tests/kafkatest/tests/upgrade_test.py | 105 -------
.../tests/zookeeper_security_upgrade_test.py | 117 --------
59 files changed, 3045 insertions(+), 2941 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/benchmarks/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/__init__.py b/tests/kafkatest/benchmarks/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/benchmarks/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/benchmarks/core/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/__init__.py b/tests/kafkatest/benchmarks/core/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/benchmarks/core/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/benchmarks/core/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py
new file mode 100644
index 0000000..9c2e32d
--- /dev/null
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -0,0 +1,253 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
+
+
+TOPIC_REP_ONE = "topic-replication-factor-one"
+TOPIC_REP_THREE = "topic-replication-factor-three"
+DEFAULT_RECORD_SIZE = 100 # bytes
+
+
+class Benchmark(Test):
+ """A benchmark of Kafka producer/consumer performance. This replicates the test
+ run here:
+ https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
+ """
+ def __init__(self, test_context):
+ super(Benchmark, self).__init__(test_context)
+ self.num_zk = 1
+ self.num_brokers = 3
+ self.topics = {
+ TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
+ TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
+ }
+
+ self.zk = ZookeeperService(test_context, self.num_zk)
+
+ self.msgs_large = 10000000
+ self.batch_size = 8*1024
+ self.buffer_memory = 64*1024*1024
+ self.msg_sizes = [10, 100, 1000, 10000, 100000]
+ self.target_data_size = 128*1024*1024
+ self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
+
+ def setUp(self):
+ self.zk.start()
+
+ def start_kafka(self, security_protocol, interbroker_security_protocol):
+ self.kafka = KafkaService(
+ self.test_context, self.num_brokers,
+ self.zk, security_protocol=security_protocol,
+ interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+ self.kafka.log_level = "INFO" # We don't DEBUG logging here
+ self.kafka.start()
+
+ @parametrize(acks=1, topic=TOPIC_REP_ONE)
+ @parametrize(acks=1, topic=TOPIC_REP_THREE)
+ @parametrize(acks=-1, topic=TOPIC_REP_THREE)
+ @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
+ @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
+ def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'):
+ """
+ Setup: 1 node zk + 3 node kafka cluster
+ Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
+ security protocol and message size are varied depending on arguments injected into this test.
+
+ Collect and return aggregate throughput statistics after all messages have been acknowledged.
+ (This runs ProducerPerformance.java under the hood)
+ """
+ self.start_kafka(security_protocol, security_protocol)
+ # Always generate the same total amount of data
+ nrecords = int(self.target_data_size / message_size)
+
+ self.producer = ProducerPerformanceService(
+ self.test_context, num_producers, self.kafka, topic=topic,
+ num_records=nrecords, record_size=message_size, throughput=-1,
+ settings={
+ 'acks': acks,
+ 'batch.size': self.batch_size,
+ 'buffer.memory': self.buffer_memory})
+ self.producer.run()
+ return compute_aggregate_throughput(self.producer)
+
+ @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+ @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+ def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None):
+ """
+ Setup: 1 node zk + 3 node kafka cluster
+ Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
+
+ Collect and return aggregate throughput statistics after all messages have been acknowledged.
+
+ (This runs ProducerPerformance.java under the hood)
+ """
+ if interbroker_security_protocol is None:
+ interbroker_security_protocol = security_protocol
+ self.start_kafka(security_protocol, interbroker_security_protocol)
+ self.producer = ProducerPerformanceService(
+ self.test_context, 1, self.kafka,
+ topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
+ throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
+ intermediate_stats=True
+ )
+ self.producer.run()
+
+ summary = ["Throughput over long run, data > memory:"]
+ data = {}
+ # FIXME we should be generating a graph too
+ # Try to break it into 5 blocks, but fall back to a smaller number if
+ # there aren't even 5 elements
+ block_size = max(len(self.producer.stats[0]) / 5, 1)
+ nblocks = len(self.producer.stats[0]) / block_size
+
+ for i in range(nblocks):
+ subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
+ if len(subset) == 0:
+ summary.append(" Time block %d: (empty)" % i)
+ data[i] = None
+ else:
+ records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset))
+ mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset))
+
+ summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec))
+ data[i] = throughput(records_per_sec, mb_per_sec)
+
+ self.logger.info("\n".join(summary))
+ return data
+
+
+ @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+ @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+ def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None):
+ """
+ Setup: 1 node zk + 3 node kafka cluster
+ Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
+ measuring the latency between production and consumption of each message.
+
+ Return aggregate latency statistics.
+
+ (Under the hood, this simply runs EndToEndLatency.scala)
+ """
+ if interbroker_security_protocol is None:
+ interbroker_security_protocol = security_protocol
+ self.start_kafka(security_protocol, interbroker_security_protocol)
+ self.logger.info("BENCHMARK: End to end latency")
+ self.perf = EndToEndLatencyService(
+ self.test_context, 1, self.kafka,
+ topic=TOPIC_REP_THREE, num_records=10000
+ )
+ self.perf.run()
+ return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
+
+ @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+ @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+ @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+ def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True):
+ """
+ Setup: 1 node zk + 3 node kafka cluster
+ Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
+ using new consumer if new_consumer == True
+
+ Return aggregate throughput statistics for both producer and consumer.
+
+ (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
+ """
+ if interbroker_security_protocol is None:
+ interbroker_security_protocol = security_protocol
+ self.start_kafka(security_protocol, interbroker_security_protocol)
+ num_records = 10 * 1000 * 1000 # 10e6
+
+ self.producer = ProducerPerformanceService(
+ self.test_context, 1, self.kafka,
+ topic=TOPIC_REP_THREE,
+ num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+ settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
+ )
+ self.consumer = ConsumerPerformanceService(
+ self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
+ Service.run_parallel(self.producer, self.consumer)
+
+ data = {
+ "producer": compute_aggregate_throughput(self.producer),
+ "consumer": compute_aggregate_throughput(self.consumer)
+ }
+ summary = [
+ "Producer + consumer:",
+ str(data)]
+ self.logger.info("\n".join(summary))
+ return data
+
+ @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+ @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+ @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+ def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1):
+ """
+ Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
+ (using new consumer iff new_consumer == True), and report throughput.
+ """
+ if interbroker_security_protocol is None:
+ interbroker_security_protocol = security_protocol
+ self.start_kafka(security_protocol, interbroker_security_protocol)
+ num_records = 10 * 1000 * 1000 # 10e6
+
+ # seed kafka w/messages
+ self.producer = ProducerPerformanceService(
+ self.test_context, 1, self.kafka,
+ topic=TOPIC_REP_THREE,
+ num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+ settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
+ )
+ self.producer.run()
+
+ # consume
+ self.consumer = ConsumerPerformanceService(
+ self.test_context, num_consumers, self.kafka,
+ topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
+ self.consumer.group = "test-consumer-group"
+ self.consumer.run()
+ return compute_aggregate_throughput(self.consumer)
+
+
+def throughput(records_per_sec, mb_per_sec):
+ """Helper method to ensure uniform representation of throughput data"""
+ return {
+ "records_per_sec": records_per_sec,
+ "mb_per_sec": mb_per_sec
+ }
+
+
+def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
+ """Helper method to ensure uniform representation of latency data"""
+ return {
+ "latency_50th_ms": latency_50th_ms,
+ "latency_99th_ms": latency_99th_ms,
+ "latency_999th_ms": latency_999th_ms
+ }
+
+
+def compute_aggregate_throughput(perf):
+ """Helper method for computing throughput after running a performance service."""
+ aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
+ aggregate_mbps = sum([r['mbps'] for r in perf.results])
+
+ return throughput(aggregate_rate, aggregate_mbps)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py
deleted file mode 100644
index 9c2e32d..0000000
--- a/tests/kafkatest/tests/benchmark_test.py
+++ /dev/null
@@ -1,253 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.services.service import Service
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
-
-
-TOPIC_REP_ONE = "topic-replication-factor-one"
-TOPIC_REP_THREE = "topic-replication-factor-three"
-DEFAULT_RECORD_SIZE = 100 # bytes
-
-
-class Benchmark(Test):
- """A benchmark of Kafka producer/consumer performance. This replicates the test
- run here:
- https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
- """
- def __init__(self, test_context):
- super(Benchmark, self).__init__(test_context)
- self.num_zk = 1
- self.num_brokers = 3
- self.topics = {
- TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
- TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
- }
-
- self.zk = ZookeeperService(test_context, self.num_zk)
-
- self.msgs_large = 10000000
- self.batch_size = 8*1024
- self.buffer_memory = 64*1024*1024
- self.msg_sizes = [10, 100, 1000, 10000, 100000]
- self.target_data_size = 128*1024*1024
- self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
-
- def setUp(self):
- self.zk.start()
-
- def start_kafka(self, security_protocol, interbroker_security_protocol):
- self.kafka = KafkaService(
- self.test_context, self.num_brokers,
- self.zk, security_protocol=security_protocol,
- interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
- self.kafka.log_level = "INFO" # We don't DEBUG logging here
- self.kafka.start()
-
- @parametrize(acks=1, topic=TOPIC_REP_ONE)
- @parametrize(acks=1, topic=TOPIC_REP_THREE)
- @parametrize(acks=-1, topic=TOPIC_REP_THREE)
- @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
- @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
- def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'):
- """
- Setup: 1 node zk + 3 node kafka cluster
- Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
- security protocol and message size are varied depending on arguments injected into this test.
-
- Collect and return aggregate throughput statistics after all messages have been acknowledged.
- (This runs ProducerPerformance.java under the hood)
- """
- self.start_kafka(security_protocol, security_protocol)
- # Always generate the same total amount of data
- nrecords = int(self.target_data_size / message_size)
-
- self.producer = ProducerPerformanceService(
- self.test_context, num_producers, self.kafka, topic=topic,
- num_records=nrecords, record_size=message_size, throughput=-1,
- settings={
- 'acks': acks,
- 'batch.size': self.batch_size,
- 'buffer.memory': self.buffer_memory})
- self.producer.run()
- return compute_aggregate_throughput(self.producer)
-
- @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
- @matrix(security_protocol=['PLAINTEXT', 'SSL'])
- def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None):
- """
- Setup: 1 node zk + 3 node kafka cluster
- Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
-
- Collect and return aggregate throughput statistics after all messages have been acknowledged.
-
- (This runs ProducerPerformance.java under the hood)
- """
- if interbroker_security_protocol is None:
- interbroker_security_protocol = security_protocol
- self.start_kafka(security_protocol, interbroker_security_protocol)
- self.producer = ProducerPerformanceService(
- self.test_context, 1, self.kafka,
- topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
- throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
- intermediate_stats=True
- )
- self.producer.run()
-
- summary = ["Throughput over long run, data > memory:"]
- data = {}
- # FIXME we should be generating a graph too
- # Try to break it into 5 blocks, but fall back to a smaller number if
- # there aren't even 5 elements
- block_size = max(len(self.producer.stats[0]) / 5, 1)
- nblocks = len(self.producer.stats[0]) / block_size
-
- for i in range(nblocks):
- subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
- if len(subset) == 0:
- summary.append(" Time block %d: (empty)" % i)
- data[i] = None
- else:
- records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset))
- mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset))
-
- summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec))
- data[i] = throughput(records_per_sec, mb_per_sec)
-
- self.logger.info("\n".join(summary))
- return data
-
-
- @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
- @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
- def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None):
- """
- Setup: 1 node zk + 3 node kafka cluster
- Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
- measuring the latency between production and consumption of each message.
-
- Return aggregate latency statistics.
-
- (Under the hood, this simply runs EndToEndLatency.scala)
- """
- if interbroker_security_protocol is None:
- interbroker_security_protocol = security_protocol
- self.start_kafka(security_protocol, interbroker_security_protocol)
- self.logger.info("BENCHMARK: End to end latency")
- self.perf = EndToEndLatencyService(
- self.test_context, 1, self.kafka,
- topic=TOPIC_REP_THREE, num_records=10000
- )
- self.perf.run()
- return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
-
- @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
- @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
- @matrix(security_protocol=['PLAINTEXT', 'SSL'])
- def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True):
- """
- Setup: 1 node zk + 3 node kafka cluster
- Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
- using new consumer if new_consumer == True
-
- Return aggregate throughput statistics for both producer and consumer.
-
- (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
- """
- if interbroker_security_protocol is None:
- interbroker_security_protocol = security_protocol
- self.start_kafka(security_protocol, interbroker_security_protocol)
- num_records = 10 * 1000 * 1000 # 10e6
-
- self.producer = ProducerPerformanceService(
- self.test_context, 1, self.kafka,
- topic=TOPIC_REP_THREE,
- num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
- settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
- )
- self.consumer = ConsumerPerformanceService(
- self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
- Service.run_parallel(self.producer, self.consumer)
-
- data = {
- "producer": compute_aggregate_throughput(self.producer),
- "consumer": compute_aggregate_throughput(self.consumer)
- }
- summary = [
- "Producer + consumer:",
- str(data)]
- self.logger.info("\n".join(summary))
- return data
-
- @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
- @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
- @matrix(security_protocol=['PLAINTEXT', 'SSL'])
- def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1):
- """
- Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
- (using new consumer iff new_consumer == True), and report throughput.
- """
- if interbroker_security_protocol is None:
- interbroker_security_protocol = security_protocol
- self.start_kafka(security_protocol, interbroker_security_protocol)
- num_records = 10 * 1000 * 1000 # 10e6
-
- # seed kafka w/messages
- self.producer = ProducerPerformanceService(
- self.test_context, 1, self.kafka,
- topic=TOPIC_REP_THREE,
- num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
- settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
- )
- self.producer.run()
-
- # consume
- self.consumer = ConsumerPerformanceService(
- self.test_context, num_consumers, self.kafka,
- topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
- self.consumer.group = "test-consumer-group"
- self.consumer.run()
- return compute_aggregate_throughput(self.consumer)
-
-
-def throughput(records_per_sec, mb_per_sec):
- """Helper method to ensure uniform representation of throughput data"""
- return {
- "records_per_sec": records_per_sec,
- "mb_per_sec": mb_per_sec
- }
-
-
-def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
- """Helper method to ensure uniform representation of latency data"""
- return {
- "latency_50th_ms": latency_50th_ms,
- "latency_99th_ms": latency_99th_ms,
- "latency_999th_ms": latency_999th_ms
- }
-
-
-def compute_aggregate_throughput(perf):
- """Helper method for computing throughput after running a performance service."""
- aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
- aggregate_mbps = sum([r['mbps'] for r in perf.results])
-
- return throughput(aggregate_rate, aggregate_mbps)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/__init__.py b/tests/kafkatest/tests/client/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/tests/client/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
new file mode 100644
index 0000000..0de53ae
--- /dev/null
+++ b/tests/kafkatest/tests/client/compression_test.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int_with_prefix
+
+class CompressionTest(ProduceConsumeValidateTest):
+ """
+ These tests validate produce / consume for compressed topics.
+ """
+
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(CompressionTest, self).__init__(test_context=test_context)
+
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
+ "partitions": 10,
+ "replication-factor": 1}})
+ self.num_partitions = 10
+ self.timeout_sec = 60
+ self.producer_throughput = 1000
+ self.num_producers = 4
+ self.messages_per_producer = 1000
+ self.num_consumers = 1
+
+ def setUp(self):
+ self.zk.start()
+
+ def min_cluster_size(self):
+ # Override this since we're adding services outside of the constructor
+ return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+ @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
+ @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
+ def test_compressed_topic(self, compression_types, new_consumer):
+ """Test produce => consume => validate for compressed topics
+ Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
+
+ compression_types parameter gives a list of compression types (or no compression if
+ "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
+ compression type from the list based on producer's index in the group.
+
+ - Produce messages in the background
+ - Consume messages in the background
+ - Stop producing, and finish consuming
+ - Validate that every acked message was consumed
+ """
+
+ self.kafka.security_protocol = "PLAINTEXT"
+ self.kafka.interbroker_security_protocol = self.kafka.security_protocol
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic, throughput=self.producer_throughput,
+ message_validator=is_int_with_prefix,
+ compression_types=compression_types)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
+ new_consumer=new_consumer, consumer_timeout_ms=60000,
+ message_validator=is_int_with_prefix)
+ self.kafka.start()
+
+ self.run_produce_consume_validate(lambda: wait_until(
+ lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+ timeout_sec=120, backoff_sec=1,
+ err_msg="Producer did not produce all messages in reasonable amount of time"))
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
new file mode 100644
index 0000000..3cd3c7c
--- /dev/null
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 4
+ RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+ ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+ def __init__(self, test_context):
+ super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
+ num_zk=1, num_brokers=1, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+ })
+
+ def _verify_range_assignment(self, consumer):
+ # range assignment should give us two partition sets: (0, 1) and (2, 3)
+ assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+ assert assignment == set([
+ frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+ frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+ def _verify_roundrobin_assignment(self, consumer):
+ assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+ assert assignment == set([
+ frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+ frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+ def rolling_update_test(self):
+ """
+ Verify rolling updates of partition assignment strategies works correctly. In this
+ test, we use a rolling restart to change the group's assignment strategy from "range"
+ to "roundrobin." We verify after every restart that all members are still in the group
+ and that the correct assignment strategy was used.
+ """
+
+ # initialize the consumer using range assignment
+ consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
+
+ consumer.start()
+ self.await_all_members(consumer)
+ self._verify_range_assignment(consumer)
+
+ # change consumer configuration to prefer round-robin assignment, but still support range assignment
+ consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+ # restart one of the nodes and verify that we are still using range assignment
+ consumer.stop_node(consumer.nodes[0])
+ consumer.start_node(consumer.nodes[0])
+ self.await_all_members(consumer)
+ self._verify_range_assignment(consumer)
+
+ # now restart the other node and verify that we have switched to round-robin
+ consumer.stop_node(consumer.nodes[1])
+ consumer.start_node(consumer.nodes[1])
+ self.await_all_members(consumer)
+ self._verify_roundrobin_assignment(consumer)
+
+ # if we want, we can now drop support for range assignment
+ consumer.assignment_strategy = self.ROUND_ROBIN
+ for node in consumer.nodes:
+ consumer.stop_node(node)
+ consumer.start_node(node)
+ self.await_all_members(consumer)
+ self._verify_roundrobin_assignment(consumer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
new file mode 100644
index 0000000..084b19d
--- /dev/null
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -0,0 +1,291 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+import signal
+
+class OffsetValidationTest(VerifiableConsumerTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 1
+
+ def __init__(self, test_context):
+ super(OffsetValidationTest, self).__init__(test_context, num_consumers=3, num_producers=1,
+ num_zk=1, num_brokers=2, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
+ })
+
+ def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in consumer.nodes:
+ consumer.stop_node(node, clean_shutdown)
+
+ wait_until(lambda: len(consumer.dead_nodes()) == 1,
+ timeout_sec=self.session_timeout_sec+5,
+ err_msg="Timed out waiting for the consumer to shutdown")
+
+ consumer.start_node(node)
+
+ self.await_all_members(consumer)
+ self.await_consumed_messages(consumer)
+
+ def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in consumer.nodes:
+ consumer.stop_node(node, clean_shutdown)
+
+ wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+ err_msg="Timed out waiting for the consumers to shutdown")
+
+ for node in consumer.nodes:
+ consumer.start_node(node)
+
+ self.await_all_members(consumer)
+ self.await_consumed_messages(consumer)
+
+ def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
+ for _ in range(num_bounces):
+ for node in self.kafka.nodes:
+ self.kafka.restart_node(node, clean_shutdown=True)
+ self.await_all_members(consumer)
+ self.await_consumed_messages(consumer)
+
+ def test_broker_rolling_bounce(self):
+ """
+ Verify correct consumer behavior when the brokers are consecutively restarted.
+
+ Setup: single Kafka cluster with one producer writing messages to a single topic with one
+ partition, an a set of consumers in the same group reading from the same topic.
+
+ - Start a producer which continues producing new messages throughout the test.
+ - Start up the consumers and wait until they've joined the group.
+ - In a loop, restart each broker consecutively, waiting for the group to stabilize between
+ each broker restart.
+ - Verify delivery semantics according to the failure type and that the broker bounces
+ did not cause unexpected group rebalances.
+ """
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer = self.setup_producer(self.TOPIC)
+ consumer = self.setup_consumer(self.TOPIC)
+
+ producer.start()
+ self.await_produced_messages(producer)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ num_rebalances = consumer.num_rebalances()
+ # TODO: make this test work with hard shutdowns, which probably requires
+ # pausing before the node is restarted to ensure that any ephemeral
+ # nodes have time to expire
+ self.rolling_bounce_brokers(consumer, clean_shutdown=True)
+
+ unexpected_rebalances = consumer.num_rebalances() - num_rebalances
+ assert unexpected_rebalances == 0, \
+ "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
+
+ consumer.stop_all()
+
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
+ def test_consumer_bounce(self, clean_shutdown, bounce_mode):
+ """
+ Verify correct consumer behavior when the consumers in the group are consecutively restarted.
+
+ Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+ - Start a producer which continues producing new messages throughout the test.
+ - Start up the consumers and wait until they've joined the group.
+ - In a loop, restart each consumer, waiting for each one to rejoin the group before
+ restarting the rest.
+ - Verify delivery semantics according to the failure type.
+ """
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer = self.setup_producer(self.TOPIC)
+ consumer = self.setup_consumer(self.TOPIC)
+
+ producer.start()
+ self.await_produced_messages(producer)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ if bounce_mode == "all":
+ self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
+ else:
+ self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
+
+ consumer.stop_all()
+ if clean_shutdown:
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ # this can only be guaranteed with a clean shutdown
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+ else:
+ # we may have duplicates in a hard failure
+ assert consumer.current_position(partition) <= consumer.total_consumed(), \
+ "Current position greater than the total number of consumed records"
+
+ @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+ def test_consumer_failure(self, clean_shutdown, enable_autocommit):
+ partition = TopicPartition(self.TOPIC, 0)
+
+ consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+ producer = self.setup_producer(self.TOPIC)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ partition_owner = consumer.owner(partition)
+ assert partition_owner is not None
+
+ # startup the producer and ensure that some records have been written
+ producer.start()
+ self.await_produced_messages(producer)
+
+ # stop the partition owner and await its shutdown
+ consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
+ wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
+ timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for consumer to close")
+
+ # ensure that the remaining consumer does some work after rebalancing
+ self.await_consumed_messages(consumer, min_messages=1000)
+
+ consumer.stop_all()
+
+ if clean_shutdown:
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ # this can only be guaranteed with a clean shutdown
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+ else:
+ # we may have duplicates in a hard failure
+ assert consumer.current_position(partition) <= consumer.total_consumed(), \
+ "Current position greater than the total number of consumed records"
+
+ # if autocommit is not turned on, we can also verify the last committed offset
+ if not enable_autocommit:
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+
+ @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
+ def test_broker_failure(self, clean_shutdown, enable_autocommit):
+ partition = TopicPartition(self.TOPIC, 0)
+
+ consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
+ producer = self.setup_producer(self.TOPIC)
+
+ producer.start()
+ consumer.start()
+ self.await_all_members(consumer)
+
+ num_rebalances = consumer.num_rebalances()
+
+ # shutdown one of the brokers
+ # TODO: we need a way to target the coordinator instead of picking arbitrarily
+ self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
+
+ # ensure that the consumers do some work after the broker failure
+ self.await_consumed_messages(consumer, min_messages=1000)
+
+ # verify that there were no rebalances on failover
+ assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
+
+ consumer.stop_all()
+
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ # if autocommit is not turned on, we can also verify the last committed offset
+ if not enable_autocommit:
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+ def test_group_consumption(self):
+ """
+ Verifies correct group rebalance behavior as consumers are started and stopped.
+ In particular, this test verifies that the partition is readable after every
+ expected rebalance.
+
+ Setup: single Kafka cluster with a group of consumers reading from one topic
+ with one partition while the verifiable producer writes to it.
+
+ - Start the consumers one by one, verifying consumption after each rebalance
+ - Shutdown the consumers one by one, verifying consumption after each rebalance
+ """
+ consumer = self.setup_consumer(self.TOPIC)
+ producer = self.setup_producer(self.TOPIC)
+
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer.start()
+
+ for num_started, node in enumerate(consumer.nodes, 1):
+ consumer.start_node(node)
+ self.await_members(consumer, num_started)
+ self.await_consumed_messages(consumer)
+
+ for num_stopped, node in enumerate(consumer.nodes, 1):
+ consumer.stop_node(node)
+
+ if num_stopped < self.num_consumers:
+ self.await_members(consumer, self.num_consumers - num_stopped)
+ self.await_consumed_messages(consumer)
+
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records did not match consumed position"
+
+ assert consumer.last_commit(partition) == consumer.current_position(partition), \
+ "Last committed offset did not match last consumed position"
+
+
+class AssignmentValidationTest(VerifiableConsumerTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 6
+
+ def __init__(self, test_context):
+ super(AssignmentValidationTest, self).__init__(test_context, num_consumers=3, num_producers=0,
+ num_zk=1, num_brokers=2, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
+ })
+
+ @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+ "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
+ def test_valid_assignment(self, assignment_strategy):
+ """
+ Verify assignment strategy correctness: each partition is assigned to exactly
+ one consumer instance.
+
+ Setup: single Kafka cluster with a set of consumers in the same group.
+
+ - Start the consumers one by one
+ - Validate assignment after every expected rebalance
+ """
+ consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy)
+ for num_started, node in enumerate(consumer.nodes, 1):
+ consumer.start_node(node)
+ self.await_members(consumer, num_started)
+ assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
new file mode 100644
index 0000000..357fd17
--- /dev/null
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -0,0 +1,92 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.kafka import config_property
+import time
+
+
+class MessageFormatChangeTest(ProduceConsumeValidateTest):
+
+ def __init__(self, test_context):
+ super(MessageFormatChangeTest, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+
+ self.zk.start()
+
+ # Producer and consumer
+ self.producer_throughput = 10000
+ self.num_producers = 1
+ self.num_consumers = 1
+ self.messages_per_producer = 100
+
+ def produce_and_consume(self, producer_version, consumer_version, group):
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic,
+ throughput=self.producer_throughput,
+ message_validator=is_int,
+ version=KafkaVersion(producer_version))
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+ self.topic, consumer_timeout_ms=30000,
+ message_validator=is_int, version=KafkaVersion(consumer_version))
+ self.consumer.group_id = group
+ self.run_produce_consume_validate(lambda: wait_until(
+ lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+ timeout_sec=120, backoff_sec=1,
+ err_msg="Producer did not produce all messages in reasonable amount of time"))
+
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
+ def test_compatibility(self, producer_version, consumer_version):
+ """ This tests performs the following checks:
+ The workload is a mix of 0.9.x and 0.10.x producers and consumers
+ that produce to and consume from a 0.10.x cluster
+ 1. initially the topic is using message format 0.9.0
+ 2. change the message format version for topic to 0.10.0 on the fly.
+ 3. change the message format version for topic back to 0.9.0 on the fly.
+ - The producers and consumers should not have any issue.
+ - Note that for 0.9.x consumers/producers we only do steps 1 and 2
+ """
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
+ "partitions": 3,
+ "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}})
+
+ self.kafka.start()
+ self.logger.info("First format change to 0.9.0")
+ self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+ self.produce_and_consume(producer_version, consumer_version, "group1")
+
+ self.logger.info("Second format change to 0.10.0")
+ self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
+ self.produce_and_consume(producer_version, consumer_version, "group2")
+
+ if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
+ self.logger.info("Third format change back to 0.9.0")
+ self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+ self.produce_and_consume(producer_version, consumer_version, "group3")
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
new file mode 100644
index 0000000..7c2ec59
--- /dev/null
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.performance import ProducerPerformanceService
+from kafkatest.services.console_consumer import ConsoleConsumer
+
+
+class QuotaTest(Test):
+ """
+ These tests verify that quota provides expected functionality -- they run
+ producer, broker, and consumer with different clientId and quota configuration and
+ check that the observed throughput is close to the value we expect.
+ """
+
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(QuotaTest, self).__init__(test_context=test_context)
+
+ self.topic = 'test_topic'
+ self.logger.info('use topic ' + self.topic)
+
+ # quota related parameters
+ self.quota_config = {'quota_producer_default': 2500000,
+ 'quota_consumer_default': 2000000,
+ 'quota_producer_bytes_per_second_overrides': 'overridden_id=3750000',
+ 'quota_consumer_bytes_per_second_overrides': 'overridden_id=3000000'}
+ self.maximum_client_deviation_percentage = 100.0
+ self.maximum_broker_deviation_percentage = 5.0
+ self.num_records = 100000
+ self.record_size = 3000
+
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+ security_protocol='PLAINTEXT',
+ interbroker_security_protocol='PLAINTEXT',
+ topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
+ quota_config=self.quota_config,
+ jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
+ 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
+ jmx_attributes=['OneMinuteRate'])
+ self.num_producers = 1
+ self.num_consumers = 2
+
+ def setUp(self):
+ self.zk.start()
+ self.kafka.start()
+
+ def min_cluster_size(self):
+ """Override this since we're adding services outside of the constructor"""
+ return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+ @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
+ @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
+ @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
+ def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
+ # Produce all messages
+ producer = ProducerPerformanceService(
+ self.test_context, producer_num, self.kafka,
+ topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_id,
+ jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], jmx_attributes=['outgoing-byte-rate'])
+
+ producer.run()
+
+ # Consume all messages
+ consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
+ new_consumer=False,
+ consumer_timeout_ms=60000, client_id=consumer_id,
+ jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s' % consumer_id],
+ jmx_attributes=['OneMinuteRate'])
+ consumer.run()
+
+ for idx, messages in consumer.messages_consumed.iteritems():
+ assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
+
+ success, msg = self.validate(self.kafka, producer, consumer)
+ assert success, msg
+
+ def validate(self, broker, producer, consumer):
+ """
+ For each client_id we validate that:
+ 1) number of consumed messages equals number of produced messages
+ 2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+ 3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+ 4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+ 5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+ """
+ success = True
+ msg = ''
+
+ self.kafka.read_jmx_output_all_nodes()
+
+ # validate that number of consumed messages equals number of produced messages
+ produced_num = sum([value['records'] for value in producer.results])
+ consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
+ self.logger.info('producer produced %d messages' % produced_num)
+ self.logger.info('consumer consumed %d messages' % consumed_num)
+ if produced_num != consumed_num:
+ success = False
+ msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
+
+ # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+ producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
+ producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
+ producer_quota_bps = self.get_producer_quota(producer.client_id)
+ self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
+ if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+ success = False
+ msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+ (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
+
+ # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+ broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
+ broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
+ self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
+ (broker_maximum_byte_in_bps, producer_quota_bps))
+ if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+ success = False
+ msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+ (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
+
+ # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+ consumer_attribute_name = 'kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s:OneMinuteRate' % consumer.client_id
+ consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
+ consumer_quota_bps = self.get_consumer_quota(consumer.client_id)
+ self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
+ if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+ success = False
+ msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+ (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
+
+ # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+ broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
+ broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
+ self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
+ (broker_maximum_byte_out_bps, consumer_quota_bps))
+ if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+ success = False
+ msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+ (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
+
+ return success, msg
+
+ def get_producer_quota(self, client_id):
+ overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_producer_bytes_per_second_overrides'].split(',')}
+ if client_id in overridden_quotas:
+ return float(overridden_quotas[client_id])
+ return self.quota_config['quota_producer_default']
+
+ def get_consumer_quota(self, client_id):
+ overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_consumer_bytes_per_second_overrides'].split(',')}
+ if client_id in overridden_quotas:
+ return float(overridden_quotas[client_id])
+ return self.quota_config['quota_consumer_default']
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/compatibility_test_new_broker_test.py
deleted file mode 100644
index 2c261df..0000000
--- a/tests/kafkatest/tests/compatibility_test_new_broker_test.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.kafka import config_property
-
-# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
-class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
-
- def __init__(self, test_context):
- super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
-
- def setUp(self):
- self.topic = "test_topic"
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
-
- self.zk.start()
-
- # Producer and consumer
- self.producer_throughput = 10000
- self.num_producers = 1
- self.num_consumers = 1
- self.messages_per_producer = 1000
-
- @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=None)
- @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=None)
- @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("CreateTime"))
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=str("LogAppendTime"))
- def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False, timestamp_type=None):
-
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
- "partitions": 3,
- "replication-factor": 3,
- 'configs': {"min.insync.replicas": 2}}})
- for node in self.kafka.nodes:
- if timestamp_type is not None:
- node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
- self.kafka.start()
-
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
- self.topic, throughput=self.producer_throughput,
- message_validator=is_int,
- compression_types=compression_types,
- version=KafkaVersion(producer_version))
-
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
- self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
- message_validator=is_int, version=KafkaVersion(consumer_version))
-
- self.run_produce_consume_validate(lambda: wait_until(
- lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
- timeout_sec=120, backoff_sec=1,
- err_msg="Producer did not produce all messages in reasonable amount of time"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compression_test.py b/tests/kafkatest/tests/compression_test.py
deleted file mode 100644
index 0de53ae..0000000
--- a/tests/kafkatest/tests/compression_test.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int_with_prefix
-
-class CompressionTest(ProduceConsumeValidateTest):
- """
- These tests validate produce / consume for compressed topics.
- """
-
- def __init__(self, test_context):
- """:type test_context: ducktape.tests.test.TestContext"""
- super(CompressionTest, self).__init__(test_context=test_context)
-
- self.topic = "test_topic"
- self.zk = ZookeeperService(test_context, num_nodes=1)
- self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
- "partitions": 10,
- "replication-factor": 1}})
- self.num_partitions = 10
- self.timeout_sec = 60
- self.producer_throughput = 1000
- self.num_producers = 4
- self.messages_per_producer = 1000
- self.num_consumers = 1
-
- def setUp(self):
- self.zk.start()
-
- def min_cluster_size(self):
- # Override this since we're adding services outside of the constructor
- return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
- @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
- @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
- def test_compressed_topic(self, compression_types, new_consumer):
- """Test produce => consume => validate for compressed topics
- Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
-
- compression_types parameter gives a list of compression types (or no compression if
- "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
- compression type from the list based on producer's index in the group.
-
- - Produce messages in the background
- - Consume messages in the background
- - Stop producing, and finish consuming
- - Validate that every acked message was consumed
- """
-
- self.kafka.security_protocol = "PLAINTEXT"
- self.kafka.interbroker_security_protocol = self.kafka.security_protocol
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
- self.topic, throughput=self.producer_throughput,
- message_validator=is_int_with_prefix,
- compression_types=compression_types)
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
- new_consumer=new_consumer, consumer_timeout_ms=60000,
- message_validator=is_int_with_prefix)
- self.kafka.start()
-
- self.run_produce_consume_validate(lambda: wait_until(
- lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
- timeout_sec=120, backoff_sec=1,
- err_msg="Producer did not produce all messages in reasonable amount of time"))
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/__init__.py b/tests/kafkatest/tests/connect/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/tests/connect/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults