You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/06/03 21:50:16 UTC

[kafka] branch trunk updated: KAFKA-8473: Adjust Connect system tests for incremental cooperative rebalancing (#6872)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55d07e7  KAFKA-8473: Adjust Connect system tests for incremental cooperative rebalancing (#6872)
55d07e7 is described below

commit 55d07e717ec3d2a0b79a690d3a3fdf912cf01a26
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Mon Jun 3 14:50:03 2019 -0700

    KAFKA-8473: Adjust Connect system tests for incremental cooperative rebalancing (#6872)
    
    Author: Konstantine Karantasis <ko...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 .../tests/connect/connect_distributed_test.py      | 74 +++++++++++++++-------
 tests/kafkatest/tests/connect/connect_rest_test.py |  7 +-
 .../templates/connect-distributed.properties       |  3 +
 3 files changed, 61 insertions(+), 23 deletions(-)

diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index a27b54d..cd1a15d 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -53,6 +53,8 @@ class ConnectDistributedTest(Test):
     STATUS_TOPIC = "connect-status"
     STATUS_REPLICATION_FACTOR = "1"
     STATUS_PARTITIONS = "1"
+    SCHEDULED_REBALANCE_MAX_DELAY_MS = "60000"
+    CONNECT_PROTOCOL="compatible"
 
     # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
     # across all nodes.
@@ -155,7 +157,9 @@ class ConnectDistributedTest(Test):
         return self._task_has_state(task_id, status, 'RUNNING')
 
     @cluster(num_nodes=5)
-    def test_restart_failed_connector(self):
+    @matrix(connect_protocol=['compatible', 'eager'])
+    def test_restart_failed_connector(self, connect_protocol):
+        self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
@@ -172,8 +176,9 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see connector transition to the RUNNING state")
 
     @cluster(num_nodes=5)
-    @matrix(connector_type=["source", "sink"])
-    def test_restart_failed_task(self, connector_type):
+    @matrix(connector_type=['source', 'sink'], connect_protocol=['compatible', 'eager'])
+    def test_restart_failed_task(self, connector_type, connect_protocol):
+        self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
@@ -196,12 +201,14 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see task transition to the RUNNING state")
 
     @cluster(num_nodes=5)
-    def test_pause_and_resume_source(self):
+    @matrix(connect_protocol=['compatible', 'eager'])
+    def test_pause_and_resume_source(self, connect_protocol):
         """
         Verify that source connectors stop producing records when paused and begin again after
         being resumed.
         """
 
+        self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
@@ -235,12 +242,14 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to produce messages after resuming source connector")
 
     @cluster(num_nodes=5)
-    def test_pause_and_resume_sink(self):
+    @matrix(connect_protocol=['compatible', 'eager'])
+    def test_pause_and_resume_sink(self, connect_protocol):
         """
         Verify that sink connectors stop consuming records when paused and begin again after
         being resumed.
         """
 
+        self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
@@ -281,11 +290,13 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to consume messages after resuming sink connector")
 
     @cluster(num_nodes=5)
-    def test_pause_state_persistent(self):
+    @matrix(connect_protocol=['compatible', 'eager'])
+    def test_pause_state_persistent(self, connect_protocol):
         """
         Verify that paused state is preserved after a cluster restart.
         """
 
+        self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
@@ -300,21 +311,25 @@ class ConnectDistributedTest(Test):
 
         self.cc.restart()
 
+        if connect_protocol == 'compatible':
+            timeout_sec = 120
+        else:
+            timeout_sec = 30
+
         # we should still be paused after restarting
         for node in self.cc.nodes:
-            wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
+            wait_until(lambda: self.is_paused(self.source, node), timeout_sec=timeout_sec,
                        err_msg="Failed to see connector startup in PAUSED state")
 
-    @cluster(num_nodes=5)
-    @parametrize(security_protocol=SecurityConfig.PLAINTEXT)
     @cluster(num_nodes=6)
-    @parametrize(security_protocol=SecurityConfig.SASL_SSL)
-    def test_file_source_and_sink(self, security_protocol):
+    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], connect_protocol=['compatible', 'eager'])
+    def test_file_source_and_sink(self, security_protocol, connect_protocol):
         """
         Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
         correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
         """
 
+        self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services(security_protocol=security_protocol)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
 
@@ -335,19 +350,25 @@ class ConnectDistributedTest(Test):
         # only processing new data.
         self.cc.restart()
 
+        if connect_protocol == 'compatible':
+            timeout_sec = 150
+        else:
+            timeout_sec = 70
+
         for node in self.cc.nodes:
             node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")
+        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=timeout_sec, err_msg="Sink output file never converged to the same state as the input file")
 
     @cluster(num_nodes=6)
-    @matrix(clean=[True, False])
-    def test_bounce(self, clean):
+    @matrix(clean=[True, False], connect_protocol=['compatible', 'eager'])
+    def test_bounce(self, clean, connect_protocol):
         """
         Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
         run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces.
         """
         num_tasks = 3
 
+        self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
@@ -449,7 +470,9 @@ class ConnectDistributedTest(Test):
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
     @cluster(num_nodes=6)
-    def test_transformations(self):
+    @matrix(connect_protocol=['compatible', 'eager'])
+    def test_transformations(self, connect_protocol):
+        self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services(timestamp_type='CreateTime')
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
@@ -504,18 +527,25 @@ class ConnectDistributedTest(Test):
             assert obj['payload'][ts_fieldname] == ts
 
     @cluster(num_nodes=5)
-    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT)
-    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT)
-    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT)
-    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT)
-    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT)
-    @parametrize(broker_version=str(LATEST_0_9), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT)
-    def test_broker_compatibility(self, broker_version, auto_create_topics, security_protocol):
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
+    @parametrize(broker_version=str(LATEST_0_9), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='compatible')
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
+    @parametrize(broker_version=str(LATEST_0_9), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT, connect_protocol='eager')
+    def test_broker_compatibility(self, broker_version, auto_create_topics, security_protocol, connect_protocol):
         """
         Verify that Connect will start up with various broker versions with various configurations. 
         When Connect distributed starts up, it either creates internal topics (v0.10.1.0 and after) 
         or relies upon the broker to auto-create the topics (v0.10.0.x and before).
         """
+        self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services(broker_version=broker_version, auto_create_topics=auto_create_topics, security_protocol=security_protocol)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
 
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index c13515b..13dcf04 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -16,6 +16,7 @@
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.connect import ConnectDistributedService, ConnectRestError, ConnectServiceBase
 from ducktape.utils.util import wait_until
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.cluster.remoteaccount import RemoteCommandError
 
@@ -65,6 +66,8 @@ class ConnectRestApiTest(KafkaTest):
 
     SCHEMA = {"type": "string", "optional": False}
 
+    CONNECT_PROTOCOL="compatible"
+
     def __init__(self, test_context):
         super(ConnectRestApiTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
             'test': {'partitions': 1, 'replication-factor': 1}
@@ -73,11 +76,13 @@ class ConnectRestApiTest(KafkaTest):
         self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
 
     @cluster(num_nodes=4)
-    def test_rest_api(self):
+    @matrix(connect_protocol=['compatible', 'eager'])
+    def test_rest_api(self, connect_protocol):
         # Template parameters
         self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
+        self.CONNECT_PROTOCOL = connect_protocol
 
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.set_external_configs(lambda node: self.render("connect-file-external.properties", node=node))
diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index ca8c4f8..e31a54a 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -20,6 +20,9 @@ bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_prot
 
 group.id={{ group|default("connect-cluster") }}
 
+connect.protocol={{ CONNECT_PROTOCOL|default("compatible") }}
+scheduled.rebalance.max.delay.ms={{ SCHEDULED_REBALANCE_MAX_DELAY_MS|default(60000) }}
+
 key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
 value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
 {% if key_converter is not defined or key_converter.endswith("JsonConverter") %}