You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2022/10/27 16:21:14 UTC

[kafka] branch 3.3 updated: MINOR: Migrate connect system tests to KRaft (#12621)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new e3f89542471 MINOR: Migrate connect system tests to KRaft (#12621)
e3f89542471 is described below

commit e3f89542471919fbe80b1f4109eea0059f766fda
Author: srishti-saraswat <98...@users.noreply.github.com>
AuthorDate: Thu Oct 27 21:49:14 2022 +0530

    MINOR: Migrate connect system tests to KRaft (#12621)
    
    Adds the `metadata_quorum` parameter to the `@matrix(...)` annotation to many existing tests, so that they are run with both zookeeper and remote_kraft nodes.
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Greg Harris <gh...@gmail.com>
---
 .../tests/connect/connect_distributed_test.py      | 51 +++++++++++-----------
 tests/kafkatest/tests/connect/connect_rest_test.py |  5 ++-
 tests/kafkatest/tests/connect/connect_test.py      | 11 +++--
 3 files changed, 36 insertions(+), 31 deletions(-)

diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 8347afc8d66..3854573fc5b 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -20,7 +20,7 @@ from ducktape.mark import matrix, parametrize
 from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService, config_property
+from kafkatest.services.kafka import KafkaService, config_property, quorum
 from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
@@ -75,7 +75,7 @@ class ConnectDistributedTest(Test):
             self.TOPIC: {'partitions': 1, 'replication-factor': 1}
         }
 
-        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
 
         self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
@@ -98,7 +98,8 @@ class ConnectDistributedTest(Test):
                                             include_filestream_connectors=include_filestream_connectors)
         self.cc.log_level = "DEBUG"
 
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
         self.kafka.start()
 
     def _start_connector(self, config_file):
@@ -164,8 +165,8 @@ class ConnectDistributedTest(Test):
         return self._task_has_state(task_id, status, 'RUNNING')
 
     @cluster(num_nodes=5)
-    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_restart_failed_connector(self, exactly_once_source, connect_protocol):
+    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_restart_failed_connector(self, exactly_once_source, connect_protocol, metadata_quorum):
         self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
@@ -187,8 +188,8 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see connector transition to the RUNNING state")
 
     @cluster(num_nodes=5)
-    @matrix(connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_restart_failed_task(self, connector_type, connect_protocol):
+    @matrix(connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_restart_failed_task(self, connector_type, connect_protocol, metadata_quorum):
         self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 'exactly-once source' else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
@@ -213,8 +214,8 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see task transition to the RUNNING state")
 
     @cluster(num_nodes=5)
-    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_restart_connector_and_tasks_failed_connector(self, connect_protocol):
+    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, metadata_quorum):
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
@@ -232,8 +233,8 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see connector transition to the RUNNING state")
 
     @cluster(num_nodes=5)
-    @matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol):
+    @matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol, metadata_quorum):
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
@@ -257,8 +258,8 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see task transition to the RUNNING state")
 
     @cluster(num_nodes=5)
-    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_pause_and_resume_source(self, exactly_once_source, connect_protocol):
+    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_pause_and_resume_source(self, exactly_once_source, connect_protocol, metadata_quorum):
         """
         Verify that source connectors stop producing records when paused and begin again after
         being resumed.
@@ -299,8 +300,8 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to produce messages after resuming source connector")
 
     @cluster(num_nodes=5)
-    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_pause_and_resume_sink(self, connect_protocol):
+    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum):
         """
         Verify that sink connectors stop consuming records when paused and begin again after
         being resumed.
@@ -347,8 +348,8 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to consume messages after resuming sink connector")
 
     @cluster(num_nodes=5)
-    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_pause_state_persistent(self, exactly_once_source, connect_protocol):
+    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_pause_state_persistent(self, exactly_once_source, connect_protocol, metadata_quorum):
         """
         Verify that paused state is preserved after a cluster restart.
         """
@@ -375,8 +376,8 @@ class ConnectDistributedTest(Test):
                        err_msg="Failed to see connector startup in PAUSED state")
 
     @cluster(num_nodes=6)
-    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol):
+    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol, metadata_quorum):
         """
         Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
         correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
@@ -409,8 +410,8 @@ class ConnectDistributedTest(Test):
         wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=150, err_msg="Sink output file never converged to the same state as the input file")
 
     @cluster(num_nodes=6)
-    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_bounce(self, clean, connect_protocol):
+    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_bounce(self, clean, connect_protocol, metadata_quorum):
         """
         Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
         run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces,
@@ -537,8 +538,8 @@ class ConnectDistributedTest(Test):
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
     @cluster(num_nodes=6)
-    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_exactly_once_source(self, clean, connect_protocol):
+    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_exactly_once_source(self, clean, connect_protocol, metadata_quorum):
         """
         Validates that source tasks run correctly and deliver messages exactly once
         when Kafka Connect workers undergo bounces, both clean and unclean.
@@ -641,8 +642,8 @@ class ConnectDistributedTest(Test):
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
     @cluster(num_nodes=6)
-    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
-    def test_transformations(self, connect_protocol):
+    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_transformations(self, connect_protocol, metadata_quorum):
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services(timestamp_type='CreateTime', include_filestream_connectors=True)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 5e3e69e1c71..65c6da1c699 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -15,6 +15,7 @@
 
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.connect import ConnectDistributedService, ConnectRestError, ConnectServiceBase
+from kafkatest.services.kafka import quorum
 from ducktape.utils.util import wait_until
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
@@ -78,8 +79,8 @@ class ConnectRestApiTest(KafkaTest):
                                             include_filestream_connectors=True)
 
     @cluster(num_nodes=4)
-    @matrix(connect_protocol=['compatible', 'eager'])
-    def test_rest_api(self, connect_protocol):
+    @matrix(connect_protocol=['compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+    def test_rest_api(self, connect_protocol, metadata_quorum):
         # Template parameters
         self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 4c2a91a6036..06850650e92 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -138,9 +138,11 @@ class ConnectStandaloneFileTest(Test):
             return False
 
     @cluster(num_nodes=5)
-    @parametrize(error_tolerance=ErrorTolerance.ALL)
-    @parametrize(error_tolerance=ErrorTolerance.NONE)
-    def test_skip_and_log_to_dlq(self, error_tolerance):
+    @parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.zk)
+    @parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.remote_kraft)
+    @parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.remote_kraft)
+    @parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.zk)
+    def test_skip_and_log_to_dlq(self, error_tolerance, metadata_quorum):
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, topics=self.topics)
 
         # set config props
@@ -171,7 +173,8 @@ class ConnectStandaloneFileTest(Test):
         self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
                                              include_filestream_connectors=True)
 
-        self.zk.start()
+        if self.zk:
+            self.zk.start()
         self.kafka.start()
 
         self.override_key_converter = "org.apache.kafka.connect.storage.StringConverter"