You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/12/02 15:08:42 UTC
kafka git commit: KAFKA-2642;
Run replication tests with SSL and SASL clients
Repository: kafka
Updated Branches:
refs/heads/trunk 69269e76a -> cff03f8b6
KAFKA-2642; Run replication tests with SSL and SASL clients
For SSL and SASL replication tests, set security protocol for clients as well.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ben Stopford <be...@gmail.com>, Geoff Anderson <ge...@confluent.io>, Jun Rao <ju...@gmail.com>
Closes #563 from rajinisivaram/KAFKA-2642
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cff03f8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cff03f8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cff03f8b
Branch: refs/heads/trunk
Commit: cff03f8b68604ce8c8c743801b075b655af23011
Parents: 69269e7
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Dec 2 08:08:37 2015 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Dec 2 08:08:37 2015 -0600
----------------------------------------------------------------------
tests/kafkatest/tests/compatibility_test.py | 2 +-
tests/kafkatest/tests/quota_test.py | 2 +-
tests/kafkatest/tests/replication_test.py | 9 +++++----
tests/kafkatest/tests/security_rolling_upgrade_test.py | 2 +-
tests/kafkatest/tests/upgrade_test.py | 2 +-
5 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/compatibility_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test.py b/tests/kafkatest/tests/compatibility_test.py
index 0310d2f..47e2752 100644
--- a/tests/kafkatest/tests/compatibility_test.py
+++ b/tests/kafkatest/tests/compatibility_test.py
@@ -32,7 +32,7 @@ class ClientCompatibilityTest(Test):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
- "min.insync.replicas": 2}})
+ 'configs': {"min.insync.replicas": 2}}})
self.zk.start()
self.kafka.start()
http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/quota_test.py b/tests/kafkatest/tests/quota_test.py
index 2649d7d..7c2ec59 100644
--- a/tests/kafkatest/tests/quota_test.py
+++ b/tests/kafkatest/tests/quota_test.py
@@ -50,7 +50,7 @@ class QuotaTest(Test):
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
security_protocol='PLAINTEXT',
interbroker_security_protocol='PLAINTEXT',
- topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'min.insync.replicas': 1}},
+ topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
quota_config=self.quota_config,
jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index 6633a4f..a8f2337 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -93,9 +93,9 @@ class ReplicationTest(ProduceConsumeValidateTest):
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
- "min.insync.replicas": 2}
+ 'configs': {"min.insync.replicas": 2}}
})
- self.producer_throughput = 10000
+ self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
@@ -123,10 +123,11 @@ class ReplicationTest(ProduceConsumeValidateTest):
- Validate that every acked message was consumed
"""
- self.kafka.security_protocol = 'PLAINTEXT'
+ self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
+ new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=60000, message_validator=is_int)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.start()
self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))
http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py
index 279cd26..1acf58b 100644
--- a/tests/kafkatest/tests/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/security_rolling_upgrade_test.py
@@ -40,7 +40,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
- "min.insync.replicas": 2}})
+ 'configs': {"min.insync.replicas": 2}}})
self.zk.start()
#reduce replica.lag.time.max.ms due to KAFKA-2827
http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
index 245129a..ea6f7ac 100644
--- a/tests/kafkatest/tests/upgrade_test.py
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -33,7 +33,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
- "min.insync.replicas": 2}})
+ 'configs': {"min.insync.replicas": 2}}})
self.zk.start()
self.kafka.start()