You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2020/04/29 01:02:06 UTC

[kafka] branch trunk updated: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e1d6a3  MINOR: add support for kafka 2.4 and 2.5 to downgrade test
4e1d6a3 is described below

commit 4e1d6a3d0432ad05e3a4f703c45c4a6cd5b53ca4
Author: Lucas Bradstreet <lu...@confluent.io>
AuthorDate: Tue Apr 28 18:01:24 2020 -0700

    MINOR: add support for kafka 2.4 and 2.5 to downgrade test
    
    The downgrade test does not currently support 2.4 and 2.5. When you enable them, it fails as a result of consumer group static membership. This PR makes the downgrade test work with all of our released versions again.
    
    Author: Lucas Bradstreet <lu...@confluent.io>
    
    Reviewers: Boyang Chen, Gwen Shapira
    
    Closes #8518 from lbradstreet/downgrade-test-2.4-2.5
---
 tests/kafkatest/tests/core/downgrade_test.py | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py
index 45e9ab9..7ed3cfb 100644
--- a/tests/kafkatest/tests/core/downgrade_test.py
+++ b/tests/kafkatest/tests/core/downgrade_test.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import parametrize
+from ducktape.mark import parametrize, matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
@@ -24,7 +24,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.end_to_end import EndToEndTest
 from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
+from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
 
 class TestDowngrade(EndToEndTest):
     PARTITIONS = 3
@@ -57,7 +57,7 @@ class TestDowngrade(EndToEndTest):
             self.kafka.start_node(node)
             self.wait_until_rejoin()
 
-    def setup_services(self, kafka_version, compression_types, security_protocol):
+    def setup_services(self, kafka_version, compression_types, security_protocol, static_membership):
         self.create_zookeeper()
         self.zk.start()
 
@@ -73,7 +73,9 @@ class TestDowngrade(EndToEndTest):
         self.producer.start()
 
         self.create_consumer(log_level="DEBUG",
-                             version=kafka_version)
+                             version=kafka_version,
+                             static_membership=static_membership)
+
         self.consumer.start()
 
     def wait_until_rejoin(self):
@@ -82,6 +84,12 @@ class TestDowngrade(EndToEndTest):
                     timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
 
     @cluster(num_nodes=7)
+    @matrix(version=[str(LATEST_2_5)], compression_types=[["none"]], static_membership=[False, True])
+    @parametrize(version=str(LATEST_2_5), compression_types=["zstd"], security_protocol="SASL_SSL")
+    # static membership was introduced with a buggy verifiable console consumer which
+    # required static membership to be enabled
+    @parametrize(version=str(LATEST_2_4), compression_types=["none"], static_membership=True)
+    @parametrize(version=str(LATEST_2_4), compression_types=["zstd"], security_protocol="SASL_SSL", static_membership=True)
     @parametrize(version=str(LATEST_2_3), compression_types=["none"])
     @parametrize(version=str(LATEST_2_3), compression_types=["zstd"], security_protocol="SASL_SSL")
     @parametrize(version=str(LATEST_2_2), compression_types=["none"])
@@ -92,7 +100,8 @@ class TestDowngrade(EndToEndTest):
     @parametrize(version=str(LATEST_2_0), compression_types=["snappy"], security_protocol="SASL_SSL")
     @parametrize(version=str(LATEST_1_1), compression_types=["none"])
     @parametrize(version=str(LATEST_1_1), compression_types=["lz4"], security_protocol="SASL_SSL")
-    def test_upgrade_and_downgrade(self, version, compression_types, security_protocol="PLAINTEXT"):
+    def test_upgrade_and_downgrade(self, version, compression_types, security_protocol="PLAINTEXT",
+            static_membership=False):
         """Test upgrade and downgrade of Kafka cluster from old versions to the current version
 
         `version` is the Kafka version to upgrade from and downgrade back to
@@ -113,7 +122,7 @@ class TestDowngrade(EndToEndTest):
         """
         kafka_version = KafkaVersion(version)
 
-        self.setup_services(kafka_version, compression_types, security_protocol)
+        self.setup_services(kafka_version, compression_types, security_protocol, static_membership)
         self.await_startup()
 
         self.logger.info("First pass bounce - rolling upgrade")