You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/12/02 15:08:42 UTC

kafka git commit: KAFKA-2642; Run replication tests with SSL and SASL clients

Repository: kafka
Updated Branches:
  refs/heads/trunk 69269e76a -> cff03f8b6


KAFKA-2642; Run replication tests with SSL and SASL clients

For SSL and SASL replication tests, set security protocol for clients as well.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ben Stopford <be...@gmail.com>, Geoff Anderson <ge...@confluent.io>, Jun Rao <ju...@gmail.com>

Closes #563 from rajinisivaram/KAFKA-2642


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cff03f8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cff03f8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cff03f8b

Branch: refs/heads/trunk
Commit: cff03f8b68604ce8c8c743801b075b655af23011
Parents: 69269e7
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Dec 2 08:08:37 2015 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Dec 2 08:08:37 2015 -0600

----------------------------------------------------------------------
 tests/kafkatest/tests/compatibility_test.py            | 2 +-
 tests/kafkatest/tests/quota_test.py                    | 2 +-
 tests/kafkatest/tests/replication_test.py              | 9 +++++----
 tests/kafkatest/tests/security_rolling_upgrade_test.py | 2 +-
 tests/kafkatest/tests/upgrade_test.py                  | 2 +-
 5 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/compatibility_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test.py b/tests/kafkatest/tests/compatibility_test.py
index 0310d2f..47e2752 100644
--- a/tests/kafkatest/tests/compatibility_test.py
+++ b/tests/kafkatest/tests/compatibility_test.py
@@ -32,7 +32,7 @@ class ClientCompatibilityTest(Test):
         self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
                                                                     "partitions": 3,
                                                                     "replication-factor": 3,
-                                                                    "min.insync.replicas": 2}})
+                                                                    'configs': {"min.insync.replicas": 2}}})
         self.zk.start()
         self.kafka.start()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/quota_test.py b/tests/kafkatest/tests/quota_test.py
index 2649d7d..7c2ec59 100644
--- a/tests/kafkatest/tests/quota_test.py
+++ b/tests/kafkatest/tests/quota_test.py
@@ -50,7 +50,7 @@ class QuotaTest(Test):
         self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
                                   security_protocol='PLAINTEXT',
                                   interbroker_security_protocol='PLAINTEXT',
-                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'min.insync.replicas': 1}},
+                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
                                   quota_config=self.quota_config,
                                   jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
                                                     'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],

http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index 6633a4f..a8f2337 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -93,9 +93,9 @@ class ReplicationTest(ProduceConsumeValidateTest):
         self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
                                                                     "partitions": 3,
                                                                     "replication-factor": 3,
-                                                                    "min.insync.replicas": 2}
+                                                                    'configs': {"min.insync.replicas": 2}}
                                                                 })
-        self.producer_throughput = 10000
+        self.producer_throughput = 1000
         self.num_producers = 1
         self.num_consumers = 1
 
@@ -123,10 +123,11 @@ class ReplicationTest(ProduceConsumeValidateTest):
             - Validate that every acked message was consumed
         """
 
-        self.kafka.security_protocol = 'PLAINTEXT'
+        self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
+        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
         self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=60000, message_validator=is_int)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
         self.kafka.start()
         
         self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))

http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py
index 279cd26..1acf58b 100644
--- a/tests/kafkatest/tests/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/security_rolling_upgrade_test.py
@@ -40,7 +40,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
             "partitions": 3,
             "replication-factor": 3,
-            "min.insync.replicas": 2}})
+            'configs': {"min.insync.replicas": 2}}})
         self.zk.start()
 
         #reduce replica.lag.time.max.ms due to KAFKA-2827

http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
index 245129a..ea6f7ac 100644
--- a/tests/kafkatest/tests/upgrade_test.py
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -33,7 +33,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
         self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
                                                                     "partitions": 3,
                                                                     "replication-factor": 3,
-                                                                    "min.insync.replicas": 2}})
+                                                                    'configs': {"min.insync.replicas": 2}}})
         self.zk.start()
         self.kafka.start()