You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/09 22:47:14 UTC

kafka git commit: KAFKA-3634; Upgrade tests for SASL authentication

Repository: kafka
Updated Branches:
  refs/heads/trunk 36ed00d9b -> 87285f36c


KAFKA-3634; Upgrade tests for SASL authentication

Add a test for changing SASL mechanism using rolling upgrade and a test for rolling upgrade from 0.9.0.x to 0.10.0 with SASL/GSSAPI.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ben Stopford <be...@gmail.com>, Geoff Anderson <ge...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #1290 from rajinisivaram/KAFKA-3634


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/87285f36
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/87285f36
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/87285f36

Branch: refs/heads/trunk
Commit: 87285f36c9cd8e1d9861f6dfaacef978772fb7f1
Parents: 36ed00d
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon May 9 23:47:04 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon May 9 23:47:04 2016 +0100

----------------------------------------------------------------------
 .../tests/core/security_rolling_upgrade_test.py | 63 ++++++++++++++++++++
 tests/kafkatest/tests/core/upgrade_test.py      |  6 +-
 2 files changed, 68 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/87285f36/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index 3977490..e14d001 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -20,6 +20,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils import is_int
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from ducktape.mark import parametrize
 from ducktape.mark import matrix
 from kafkatest.services.security.kafka_acls import ACLs
 import time
@@ -74,6 +75,9 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
 
         # Roll cluster to disable PLAINTEXT port
         self.kafka.close_port('PLAINTEXT')
+        self.set_authorizer_and_bounce(client_protocol, broker_protocol)
+
+    def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
         self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
         self.acls.set_acls(client_protocol, self.kafka, self.zk, self.topic, self.group)
         self.acls.set_acls(broker_protocol, self.kafka, self.zk, self.topic, self.group)
@@ -85,6 +89,19 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.kafka.start_minikdc()
         self.bounce()
 
+    def add_sasl_mechanism(self, new_client_sasl_mechanism):
+        self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
+        self.kafka.start_minikdc()
+        self.bounce()
+
+    def roll_in_sasl_mechanism(self, security_protocol, new_sasl_mechanism):
+        # Roll cluster to update inter-broker SASL mechanism. This disables the old mechanism.
+        self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
+        self.bounce()
+
+        # Bounce again with ACLs for new mechanism
+        self.set_authorizer_and_bounce(security_protocol, security_protocol)
+
     @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
     def test_rolling_upgrade_phase_one(self, client_protocol):
         """
@@ -125,3 +142,49 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
 
         #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
         self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
+
+    @parametrize(new_client_sasl_mechanism='PLAIN')
+    def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
+        """
+        Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce
+        and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism.
+        """
+        self.kafka.interbroker_security_protocol = "SASL_SSL"
+        self.kafka.security_protocol = "SASL_SSL"
+        self.kafka.client_sasl_mechanism = "GSSAPI"
+        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.start()
+
+        # Create SASL/GSSAPI producer and consumer
+        self.create_producer_and_consumer()
+
+        # Rolling upgrade, adding new SASL mechanism, ensuring the GSSAPI producer/consumer continues to run
+        self.run_produce_consume_validate(self.add_sasl_mechanism, new_client_sasl_mechanism)
+
+        # Now we can produce and consume using the new SASL mechanism
+        self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
+        self.create_producer_and_consumer()
+        self.run_produce_consume_validate(lambda: time.sleep(1))
+
+    @parametrize(new_sasl_mechanism='PLAIN')
+    def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
+        """
+        Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one).
+        Start Producer and Consumer using the second mechanism
+        Incrementally upgrade to set inter-broker to the second mechanism and disable GSSAPI
+        Incrementally upgrade again to add ACLs
+        Ensure the producer and consumer run throughout
+        """
+        #Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients
+        self.kafka.security_protocol = "SASL_SSL"
+        self.kafka.interbroker_security_protocol = "SASL_SSL"
+        self.kafka.client_sasl_mechanism = new_sasl_mechanism
+        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.start()
+
+        #Create Producer and Consumer using second mechanism
+        self.create_producer_and_consumer()
+
+        #Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout
+        self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism)
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/87285f36/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
index 790b69d..16a518d 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -61,6 +61,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
 
 
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=True, security_protocol="SASL_SSL")
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=True)
@@ -70,7 +71,8 @@ class TestUpgrade(ProduceConsumeValidateTest):
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=True)
     @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"])
     @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"])
-    def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, new_consumer=False):
+    def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
+                     new_consumer=False, security_protocol="PLAINTEXT"):
         """Test upgrade of Kafka broker cluster from 0.8.2 or 0.9.0 to 0.10
 
         from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X or 0.9
@@ -93,6 +95,8 @@ class TestUpgrade(ProduceConsumeValidateTest):
                                   version=KafkaVersion(from_kafka_version),
                                   topics={self.topic: {"partitions": 3, "replication-factor": 3,
                                                        'configs': {"min.insync.replicas": 2}}})
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
         self.kafka.start()
 
         self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,