You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/06/27 23:49:22 UTC
[kafka] branch trunk updated: KAFKA-6944;
Add system tests testing the new throttling behavior using older
clients/brokers
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a3db23c KAFKA-6944; Add system tests testing the new throttling behavior using older clients/brokers
a3db23c is described below
commit a3db23cb76436786ebe0e5f388b4ca97dec51686
Author: Jon Lee <jo...@linkedin.com>
AuthorDate: Wed Jun 27 16:49:12 2018 -0700
KAFKA-6944; Add system tests testing the new throttling behavior using older clients/brokers
Added two additional test cases to quota_test.py, which run between brokers and clients with different throttling behaviors. More specifically,
1. clients with new throttling behavior (i.e., post-KIP-219) and brokers with old throttling behavior (i.e., pre-KIP-219)
2. clients with old throttling behavior and brokers with new throttling behavior
Author: Jon Lee <jo...@linkedin.com>
Author: Dong Lin <li...@gmail.com>
Reviewers: Dong Lin <li...@gmail.com>
Closes #5294 from jonlee2/kafka-6944
---
tests/kafkatest/tests/client/quota_test.py | 23 +++++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index 47a6a96..c084e08 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -21,6 +21,7 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.performance import ProducerPerformanceService
from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.version import DEV_BRANCH, V_1_1_0
class QuotaConfig(object):
CLIENT_ID = 'client-id'
@@ -119,7 +120,6 @@ class QuotaTest(Test):
def setUp(self):
self.zk.start()
- self.kafka.start()
def min_cluster_size(self):
"""Override this since we're adding services outside of the constructor"""
@@ -128,15 +128,30 @@ class QuotaTest(Test):
@cluster(num_nodes=5)
@matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
@parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
- def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
+ @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_broker_throttling_behavior=True)
+ @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_client_throttling_behavior=True)
+ def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1,
+ old_broker_throttling_behavior=False, old_client_throttling_behavior=False):
+ # Old (pre-2.0) throttling behavior for broker throttles before sending a response to the client.
+ if old_broker_throttling_behavior:
+ self.kafka.set_version(V_1_1_0)
+ self.kafka.start()
+
self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
producer_client_id = self.quota_config.client_id
consumer_client_id = self.quota_config.client_id
+ # Old (pre-2.0) throttling behavior for client does not throttle upon receiving a response with a non-zero throttle time.
+ if old_client_throttling_behavior:
+ client_version = V_1_1_0
+ else:
+ client_version = DEV_BRANCH
+
# Produce all messages
producer = ProducerPerformanceService(
self.test_context, producer_num, self.kafka,
- topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id)
+ topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1,
+ client_id=producer_client_id, version=client_version)
producer.run()
@@ -144,7 +159,7 @@ class QuotaTest(Test):
consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
consumer_timeout_ms=60000, client_id=consumer_client_id,
jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
- jmx_attributes=['bytes-consumed-rate'])
+ jmx_attributes=['bytes-consumed-rate'], version=client_version)
consumer.run()
for idx, messages in consumer.messages_consumed.iteritems():