You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/17 00:30:57 UTC
kafka git commit: KAFKA-4211;
Update system test services to use the new consumer by default
Repository: kafka
Updated Branches:
refs/heads/trunk 04a13e82a -> 249e41368
KAFKA-4211; Update system test services to use the new consumer by default
Update system test method signatures and method calls to use the new consumer by default.
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2060 from vahidhashemian/KAFKA-4211
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/249e4136
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/249e4136
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/249e4136
Branch: refs/heads/trunk
Commit: 249e4136852be49579eef53b2bce0b2506797666
Parents: 04a13e8
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Wed Nov 16 16:30:48 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Nov 16 16:30:48 2016 -0800
----------------------------------------------------------------------
.../sanity_checks/test_console_consumer.py | 2 +-
.../sanity_checks/test_performance_services.py | 8 +++----
tests/kafkatest/services/console_consumer.py | 2 +-
tests/kafkatest/services/kafka/kafka.py | 4 ++--
tests/kafkatest/services/mirror_maker.py | 2 +-
.../performance/consumer_performance.py | 2 +-
.../tests/client/message_format_change_test.py | 2 +-
tests/kafkatest/tests/client/quota_test.py | 1 -
tests/kafkatest/tests/connect/connect_test.py | 2 +-
.../core/compatibility_test_new_broker_test.py | 22 ++++++++++----------
.../tests/core/security_rolling_upgrade_test.py | 2 +-
tests/kafkatest/tests/core/security_test.py | 2 +-
tests/kafkatest/tests/core/throttling_test.py | 1 -
tests/kafkatest/tests/core/upgrade_test.py | 22 ++++++++++----------
.../core/zookeeper_security_upgrade_test.py | 2 +-
15 files changed, 37 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 773a561..18cbfb7 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -37,7 +37,7 @@ class ConsoleConsumerTest(Test):
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
- self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)
+ self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, new_consumer=False)
def setUp(self):
self.zk.start()
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/sanity_checks/test_performance_services.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py
index 94a61bc..7b5946a 100644
--- a/tests/kafkatest/sanity_checks/test_performance_services.py
+++ b/tests/kafkatest/sanity_checks/test_performance_services.py
@@ -37,12 +37,12 @@ class PerformanceServiceTest(Test):
# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
# the overhead should be manageable.
- @parametrize(version=str(LATEST_0_8_2))
+ @parametrize(version=str(LATEST_0_8_2), new_consumer=False)
@parametrize(version=str(LATEST_0_9), new_consumer=False)
- @parametrize(version=str(LATEST_0_9), new_consumer=True)
+ @parametrize(version=str(LATEST_0_9))
@parametrize(version=str(TRUNK), new_consumer=False)
- @parametrize(version=str(TRUNK), new_consumer=True)
- def test_version(self, version=str(LATEST_0_9), new_consumer=False):
+ @parametrize(version=str(TRUNK))
+ def test_version(self, version=str(LATEST_0_9), new_consumer=True):
"""
Sanity check out producer performance service - verify that we can run the service with a small
number of messages. The actual stats here are pretty meaningless since the number of messages is quite small.
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index dae6095..050ea6d 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -87,7 +87,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
"collect_default": True}
}
- def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=False,
+ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True,
message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
enable_systest_events=False, stop_timeout_sec=15):
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 4ac53b1..c79f8c8 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -471,7 +471,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
return self.get_node(leader_idx)
- def list_consumer_groups(self, node=None, new_consumer=False, command_config=None):
+ def list_consumer_groups(self, node=None, new_consumer=True, command_config=None):
""" Get list of consumer groups.
"""
if node is None:
@@ -498,7 +498,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.debug(output)
return output
- def describe_consumer_group(self, group, node=None, new_consumer=False, command_config=None):
+ def describe_consumer_group(self, group, node=None, new_consumer=True, command_config=None):
""" Describe a consumer group.
"""
if node is None:
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 734452c..14af4cf 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -72,7 +72,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
}
def __init__(self, context, num_nodes, source, target, whitelist=None, blacklist=None, num_streams=1,
- new_consumer=False, consumer_timeout_ms=None, offsets_storage="kafka",
+ new_consumer=True, consumer_timeout_ms=None, offsets_storage="kafka",
offset_commit_interval_ms=60000, log_level="DEBUG", producer_interceptor_classes=None):
"""
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index a17545d..3e02a5b 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -70,7 +70,7 @@ class ConsumerPerformanceService(PerformanceService):
"collect_default": True}
}
- def __init__(self, context, num_nodes, kafka, topic, messages, version=TRUNK, new_consumer=False, settings={}):
+ def __init__(self, context, num_nodes, kafka, topic, messages, version=TRUNK, new_consumer=True, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
index a1ebf22..a57c04b 100644
--- a/tests/kafkatest/tests/client/message_format_change_test.py
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -48,7 +48,7 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
message_validator=is_int,
version=KafkaVersion(producer_version))
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
- self.topic, consumer_timeout_ms=30000,
+ self.topic, new_consumer=False, consumer_timeout_ms=30000,
message_validator=is_int, version=KafkaVersion(consumer_version))
self.consumer.group_id = group
self.run_produce_consume_validate(lambda: wait_until(
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index 2abd089..1d31569 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -141,7 +141,6 @@ class QuotaTest(Test):
# Consume all messages
consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
- new_consumer=True,
consumer_timeout_ms=60000, client_id=consumer_client_id,
jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
jmx_attributes=['bytes-consumed-rate'])
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 93f5734..83acb4a 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -83,7 +83,7 @@ class ConnectStandaloneFileTest(Test):
self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
- consumer_timeout_ms=1000, new_consumer=True)
+ consumer_timeout_ms=1000)
self.zk.start()
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index 7fe875d..d6a0a12 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -43,17 +43,17 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
self.num_consumers = 1
self.messages_per_producer = 1000
- @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=None)
- @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=None)
- @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("CreateTime"))
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=str("LogAppendTime"))
- def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False, timestamp_type=None):
+ @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
+ @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=None)
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+ @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
+ def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
"partitions": 3,
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index e14d001..51b2e60 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -54,7 +54,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.consumer = ConsoleConsumer(
self.test_context, self.num_consumers, self.kafka, self.topic,
- consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
+ consumer_timeout_ms=60000, message_validator=is_int)
self.consumer.group_id = "group"
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/security_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
index 8c150a2..b6bc656 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -102,5 +102,5 @@ class SecurityTest(ProduceConsumeValidateTest):
def create_producer_and_consumer(self):
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=True, consumer_timeout_ms=10000, message_validator=is_int)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=10000, message_validator=is_int)
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
index efc36d4..2e21322 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -163,7 +163,6 @@ class ThrottlingTest(ProduceConsumeValidateTest):
self.num_consumers,
self.kafka,
self.topic,
- new_consumer=True,
consumer_timeout_ms=60000,
message_validator=is_int,
from_beginning=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
index 062f3e5..15a9696 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -60,21 +60,21 @@ class TestUpgrade(ProduceConsumeValidateTest):
node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
self.kafka.start_node(node)
+ @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
- @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
- @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=True, security_protocol="SASL_SSL")
- @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False)
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL")
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"])
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
- @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=True)
- @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True)
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False)
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"])
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
- @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=True)
- @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"])
+ @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False)
+ @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
- new_consumer=False, security_protocol="PLAINTEXT"):
+ new_consumer=True, security_protocol="PLAINTEXT"):
"""Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0 or 0.10.0 to the current version
from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 0.9.0.x or 0.10.0.x
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
index 582eb68..0cfdf16 100644
--- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -52,7 +52,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
self.consumer = ConsoleConsumer(
self.test_context, self.num_consumers, self.kafka, self.topic,
- consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
+ consumer_timeout_ms=60000, message_validator=is_int)
self.consumer.group_id = self.group