You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2022/10/27 16:21:14 UTC
[kafka] branch 3.3 updated: MINOR: Migrate connect system tests to KRaft (#12621)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new e3f89542471 MINOR: Migrate connect system tests to KRaft (#12621)
e3f89542471 is described below
commit e3f89542471919fbe80b1f4109eea0059f766fda
Author: srishti-saraswat <98...@users.noreply.github.com>
AuthorDate: Thu Oct 27 21:49:14 2022 +0530
MINOR: Migrate connect system tests to KRaft (#12621)
Adds the `metadata_quorum` parameter to the `@matrix(...)` annotation to many existing tests, so that they are run with both zookeeper and remote_kraft nodes.
Reviewers: Randall Hauch <rh...@gmail.com>, Greg Harris <gh...@gmail.com>
---
.../tests/connect/connect_distributed_test.py | 51 +++++++++++-----------
tests/kafkatest/tests/connect/connect_rest_test.py | 5 ++-
tests/kafkatest/tests/connect/connect_test.py | 11 +++--
3 files changed, 36 insertions(+), 31 deletions(-)
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 8347afc8d66..3854573fc5b 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -20,7 +20,7 @@ from ducktape.mark import matrix, parametrize
from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService, config_property
+from kafkatest.services.kafka import KafkaService, config_property, quorum
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
@@ -75,7 +75,7 @@ class ConnectDistributedTest(Test):
self.TOPIC: {'partitions': 1, 'replication-factor': 1}
}
- self.zk = ZookeeperService(test_context, self.num_zk)
+ self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
@@ -98,7 +98,8 @@ class ConnectDistributedTest(Test):
include_filestream_connectors=include_filestream_connectors)
self.cc.log_level = "DEBUG"
- self.zk.start()
+ if self.zk:
+ self.zk.start()
self.kafka.start()
def _start_connector(self, config_file):
@@ -164,8 +165,8 @@ class ConnectDistributedTest(Test):
return self._task_has_state(task_id, status, 'RUNNING')
@cluster(num_nodes=5)
- @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_restart_failed_connector(self, exactly_once_source, connect_protocol):
+ @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_restart_failed_connector(self, exactly_once_source, connect_protocol, metadata_quorum):
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
@@ -187,8 +188,8 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector transition to the RUNNING state")
@cluster(num_nodes=5)
- @matrix(connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_restart_failed_task(self, connector_type, connect_protocol):
+ @matrix(connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_restart_failed_task(self, connector_type, connect_protocol, metadata_quorum):
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 'exactly-once source' else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
@@ -213,8 +214,8 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see task transition to the RUNNING state")
@cluster(num_nodes=5)
- @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_restart_connector_and_tasks_failed_connector(self, connect_protocol):
+ @matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, metadata_quorum):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
@@ -232,8 +233,8 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector transition to the RUNNING state")
@cluster(num_nodes=5)
- @matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol):
+ @matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol, metadata_quorum):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
@@ -257,8 +258,8 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see task transition to the RUNNING state")
@cluster(num_nodes=5)
- @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_pause_and_resume_source(self, exactly_once_source, connect_protocol):
+ @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_pause_and_resume_source(self, exactly_once_source, connect_protocol, metadata_quorum):
"""
Verify that source connectors stop producing records when paused and begin again after
being resumed.
@@ -299,8 +300,8 @@ class ConnectDistributedTest(Test):
err_msg="Failed to produce messages after resuming source connector")
@cluster(num_nodes=5)
- @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_pause_and_resume_sink(self, connect_protocol):
+ @matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum):
"""
Verify that sink connectors stop consuming records when paused and begin again after
being resumed.
@@ -347,8 +348,8 @@ class ConnectDistributedTest(Test):
err_msg="Failed to consume messages after resuming sink connector")
@cluster(num_nodes=5)
- @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_pause_state_persistent(self, exactly_once_source, connect_protocol):
+ @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, metadata_quorum):
"""
Verify that paused state is preserved after a cluster restart.
"""
@@ -375,8 +376,8 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector startup in PAUSED state")
@cluster(num_nodes=6)
- @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol):
+ @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol, metadata_quorum):
"""
Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
@@ -409,8 +410,8 @@ class ConnectDistributedTest(Test):
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=150, err_msg="Sink output file never converged to the same state as the input file")
@cluster(num_nodes=6)
- @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_bounce(self, clean, connect_protocol):
+ @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_bounce(self, clean, connect_protocol, metadata_quorum):
"""
Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces,
@@ -537,8 +538,8 @@ class ConnectDistributedTest(Test):
assert success, "Found validation errors:\n" + "\n ".join(errors)
@cluster(num_nodes=6)
- @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_exactly_once_source(self, clean, connect_protocol):
+ @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_exactly_once_source(self, clean, connect_protocol, metadata_quorum):
"""
Validates that source tasks run correctly and deliver messages exactly once
when Kafka Connect workers undergo bounces, both clean and unclean.
@@ -641,8 +642,8 @@ class ConnectDistributedTest(Test):
assert success, "Found validation errors:\n" + "\n ".join(errors)
@cluster(num_nodes=6)
- @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
- def test_transformations(self, connect_protocol):
+ @matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_transformations(self, connect_protocol, metadata_quorum):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services(timestamp_type='CreateTime', include_filestream_connectors=True)
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 5e3e69e1c71..65c6da1c699 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -15,6 +15,7 @@
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.connect import ConnectDistributedService, ConnectRestError, ConnectServiceBase
+from kafkatest.services.kafka import quorum
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
@@ -78,8 +79,8 @@ class ConnectRestApiTest(KafkaTest):
include_filestream_connectors=True)
@cluster(num_nodes=4)
- @matrix(connect_protocol=['compatible', 'eager'])
- def test_rest_api(self, connect_protocol):
+ @matrix(connect_protocol=['compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
+ def test_rest_api(self, connect_protocol, metadata_quorum):
# Template parameters
self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 4c2a91a6036..06850650e92 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -138,9 +138,11 @@ class ConnectStandaloneFileTest(Test):
return False
@cluster(num_nodes=5)
- @parametrize(error_tolerance=ErrorTolerance.ALL)
- @parametrize(error_tolerance=ErrorTolerance.NONE)
- def test_skip_and_log_to_dlq(self, error_tolerance):
+ @parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.zk)
+ @parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.remote_kraft)
+ @parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.remote_kraft)
+ @parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.zk)
+ def test_skip_and_log_to_dlq(self, error_tolerance, metadata_quorum):
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, topics=self.topics)
# set config props
@@ -171,7 +173,8 @@ class ConnectStandaloneFileTest(Test):
self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
include_filestream_connectors=True)
- self.zk.start()
+ if self.zk:
+ self.zk.start()
self.kafka.start()
self.override_key_converter = "org.apache.kafka.connect.storage.StringConverter"