You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/06/27 23:49:22 UTC

[kafka] branch trunk updated: KAFKA-6944; Add system tests testing the new throttling behavior using older clients/brokers

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a3db23c  KAFKA-6944; Add system tests testing the new throttling behavior using older clients/brokers
a3db23c is described below

commit a3db23cb76436786ebe0e5f388b4ca97dec51686
Author: Jon Lee <jo...@linkedin.com>
AuthorDate: Wed Jun 27 16:49:12 2018 -0700

    KAFKA-6944; Add system tests testing the new throttling behavior using older clients/brokers
    
    Added two additional test cases to quota_test.py, which run between brokers and clients with different throttling behaviors. More specifically,
    1. clients with new throttling behavior (i.e., post-KIP-219) and brokers with old throttling behavior (i.e., pre-KIP-219)
    2. clients with old throttling behavior and brokers with new throttling behavior
    
    Author: Jon Lee <jo...@linkedin.com>
    Author: Dong Lin <li...@gmail.com>
    
    Reviewers: Dong Lin <li...@gmail.com>
    
    Closes #5294 from jonlee2/kafka-6944
---
 tests/kafkatest/tests/client/quota_test.py | 23 +++++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)

diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index 47a6a96..c084e08 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -21,6 +21,7 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.performance import ProducerPerformanceService
 from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.version import DEV_BRANCH, V_1_1_0
 
 class QuotaConfig(object):
     CLIENT_ID = 'client-id'
@@ -119,7 +120,6 @@ class QuotaTest(Test):
 
     def setUp(self):
         self.zk.start()
-        self.kafka.start()
 
     def min_cluster_size(self):
         """Override this since we're adding services outside of the constructor"""
@@ -128,15 +128,30 @@ class QuotaTest(Test):
     @cluster(num_nodes=5)
     @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
     @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
-    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_broker_throttling_behavior=True)
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_client_throttling_behavior=True)
+    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1,
+                   old_broker_throttling_behavior=False, old_client_throttling_behavior=False):
+        # Old (pre-2.0) throttling behavior for broker throttles before sending a response to the client.
+        if old_broker_throttling_behavior:
+            self.kafka.set_version(V_1_1_0)
+        self.kafka.start()
+
         self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
         producer_client_id = self.quota_config.client_id
         consumer_client_id = self.quota_config.client_id
 
+        # Old (pre-2.0) throttling behavior for client does not throttle upon receiving a response with a non-zero throttle time.
+        if old_client_throttling_behavior:
+            client_version = V_1_1_0
+        else:
+            client_version = DEV_BRANCH
+
         # Produce all messages
         producer = ProducerPerformanceService(
             self.test_context, producer_num, self.kafka,
-            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id)
+            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1,
+            client_id=producer_client_id, version=client_version)
 
         producer.run()
 
@@ -144,7 +159,7 @@ class QuotaTest(Test):
         consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
             consumer_timeout_ms=60000, client_id=consumer_client_id,
             jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
-            jmx_attributes=['bytes-consumed-rate'])
+            jmx_attributes=['bytes-consumed-rate'], version=client_version)
         consumer.run()
 
         for idx, messages in consumer.messages_consumed.iteritems():