You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2020/04/29 01:02:06 UTC
[kafka] branch trunk updated: MINOR: add support for kafka 2.4 and
2.5 to downgrade test
This is an automated email from the ASF dual-hosted git repository.
gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e1d6a3 MINOR: add support for kafka 2.4 and 2.5 to downgrade test
4e1d6a3 is described below
commit 4e1d6a3d0432ad05e3a4f703c45c4a6cd5b53ca4
Author: Lucas Bradstreet <lu...@confluent.io>
AuthorDate: Tue Apr 28 18:01:24 2020 -0700
MINOR: add support for kafka 2.4 and 2.5 to downgrade test
The downgrade test does not currently support 2.4 and 2.5. When you enable them, it fails as a result of consumer group static membership. This PR makes the downgrade test work with all of our released versions again.
Author: Lucas Bradstreet <lu...@confluent.io>
Reviewers: Boyang Chen, Gwen Shapira
Closes #8518 from lbradstreet/downgrade-test-2.4-2.5
---
tests/kafkatest/tests/core/downgrade_test.py | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py
index 45e9ab9..7ed3cfb 100644
--- a/tests/kafkatest/tests/core/downgrade_test.py
+++ b/tests/kafkatest/tests/core/downgrade_test.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ducktape.mark import parametrize
+from ducktape.mark import parametrize, matrix
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
@@ -24,7 +24,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.end_to_end import EndToEndTest
from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
+from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
class TestDowngrade(EndToEndTest):
PARTITIONS = 3
@@ -57,7 +57,7 @@ class TestDowngrade(EndToEndTest):
self.kafka.start_node(node)
self.wait_until_rejoin()
- def setup_services(self, kafka_version, compression_types, security_protocol):
+ def setup_services(self, kafka_version, compression_types, security_protocol, static_membership):
self.create_zookeeper()
self.zk.start()
@@ -73,7 +73,9 @@ class TestDowngrade(EndToEndTest):
self.producer.start()
self.create_consumer(log_level="DEBUG",
- version=kafka_version)
+ version=kafka_version,
+ static_membership=static_membership)
+
self.consumer.start()
def wait_until_rejoin(self):
@@ -82,6 +84,12 @@ class TestDowngrade(EndToEndTest):
timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
@cluster(num_nodes=7)
+ @matrix(version=[str(LATEST_2_5)], compression_types=[["none"]], static_membership=[False, True])
+ @parametrize(version=str(LATEST_2_5), compression_types=["zstd"], security_protocol="SASL_SSL")
+ # static membership was introduced with a buggy verifiable console consumer which
+ # required static membership to be enabled
+ @parametrize(version=str(LATEST_2_4), compression_types=["none"], static_membership=True)
+ @parametrize(version=str(LATEST_2_4), compression_types=["zstd"], security_protocol="SASL_SSL", static_membership=True)
@parametrize(version=str(LATEST_2_3), compression_types=["none"])
@parametrize(version=str(LATEST_2_3), compression_types=["zstd"], security_protocol="SASL_SSL")
@parametrize(version=str(LATEST_2_2), compression_types=["none"])
@@ -92,7 +100,8 @@ class TestDowngrade(EndToEndTest):
@parametrize(version=str(LATEST_2_0), compression_types=["snappy"], security_protocol="SASL_SSL")
@parametrize(version=str(LATEST_1_1), compression_types=["none"])
@parametrize(version=str(LATEST_1_1), compression_types=["lz4"], security_protocol="SASL_SSL")
- def test_upgrade_and_downgrade(self, version, compression_types, security_protocol="PLAINTEXT"):
+ def test_upgrade_and_downgrade(self, version, compression_types, security_protocol="PLAINTEXT",
+ static_membership=False):
"""Test upgrade and downgrade of Kafka cluster from old versions to the current version
`version` is the Kafka version to upgrade from and downgrade back to
@@ -113,7 +122,7 @@ class TestDowngrade(EndToEndTest):
"""
kafka_version = KafkaVersion(version)
- self.setup_services(kafka_version, compression_types, security_protocol)
+ self.setup_services(kafka_version, compression_types, security_protocol, static_membership)
self.await_startup()
self.logger.info("First pass bounce - rolling upgrade")