You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/07/20 14:00:11 UTC

[kafka] 01/02: KAFKA-10286: Connect system tests should wait for workers to join group (#9040)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2a2ccdb7ee1e63f901b2c9f0afba9e1efe9bd82f
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Mon Jul 20 06:48:02 2020 -0700

    KAFKA-10286: Connect system tests should wait for workers to join group (#9040)
    
    Currently, the system tests `connect_distributed_test` and `connect_rest_test` only wait for the REST api to come up.
    The startup of the worker includes an asynchronous process for joining the worker group and syncing with other workers.
    There are some situations in which this sync takes an unusually long time, and the test continues without all workers up.
    This leads to flakey test failures, as worker joins are not given sufficient time to timeout and retry without waiting explicitly.
    
    This changes the `ConnectDistributedTest` to wait for the Joined group message to be printed to the logs before continuing with tests. I've activated this behavior by default, as it's a superset of the checks that were performed by default before.
    
    This log message is present in every version of DistributedHerder that I could find, in slightly different forms, but always with `Joined group` at the beginning of the log message. This change should be safe to backport to any branch.
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
    Author: Greg Harris <gr...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 tests/kafkatest/services/connect.py | 25 +++++++++++++++++++++----
 1 file changed, 21 insertions(+), 4 deletions(-)

diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index b056358..6ee6ade 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -44,13 +44,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     CONNECT_REST_PORT = 8083
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "connect_heap_dump.bin")
 
-    # Currently the Connect worker supports waiting on three modes:
+    # Currently the Connect worker supports waiting on four modes:
     STARTUP_MODE_INSTANT = 'INSTANT'
     """STARTUP_MODE_INSTANT: Start Connect worker and return immediately"""
     STARTUP_MODE_LOAD = 'LOAD'
     """STARTUP_MODE_LOAD: Start Connect worker and return after discovering and loading plugins"""
     STARTUP_MODE_LISTEN = 'LISTEN'
     """STARTUP_MODE_LISTEN: Start Connect worker and return after opening the REST port."""
+    STARTUP_MODE_JOIN = 'JOIN'
+    """STARTUP_MODE_JOIN: Start Connect worker and return after joining the group."""
 
     logs = {
         "connect_log": {
@@ -115,8 +117,9 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             self.logger.debug("REST resources are not loaded yet")
             return False
 
-    def start(self, mode=STARTUP_MODE_LISTEN):
-        self.startup_mode = mode
+    def start(self, mode=None):
+        if mode:
+            self.startup_mode = mode
         super(ConnectServiceBase, self).start()
 
     def start_and_return_immediately(self, node, worker_type, remote_connector_configs):
@@ -137,6 +140,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
                    err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" %
                    (str(node.account), self.startup_mode))
 
+    def start_and_wait_to_join_group(self, node, worker_type, remote_connector_configs):
+        if worker_type != 'distributed':
+            raise RuntimeError("Cannot wait for joined group message for %s" % worker_type)
+        with node.account.monitor_log(self.LOG_FILE) as monitor:
+            self.start_and_return_immediately(node, worker_type, remote_connector_configs)
+            monitor.wait_until('Joined group', timeout_sec=self.startup_timeout_sec,
+                               err_msg="Never saw message indicating Kafka Connect joined group on node: " +
+                                       "%s in condition mode: %s" % (str(node.account), self.startup_mode))
+
     def stop_node(self, node, clean_shutdown=True):
         self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account))
         pids = self.pids(node)
@@ -310,6 +322,8 @@ class ConnectStandaloneService(ConnectServiceBase):
             self.start_and_wait_to_load_plugins(node, 'standalone', remote_connector_configs)
         elif self.startup_mode == self.STARTUP_MODE_INSTANT:
             self.start_and_return_immediately(node, 'standalone', remote_connector_configs)
+        elif self.startup_mode == self.STARTUP_MODE_JOIN:
+            self.start_and_wait_to_join_group(node, 'standalone', remote_connector_configs)
         else:
             # The default mode is to wait until the complete startup of the worker
             self.start_and_wait_to_start_listening(node, 'standalone', remote_connector_configs)
@@ -324,6 +338,7 @@ class ConnectDistributedService(ConnectServiceBase):
     def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
                  configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
         super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
+        self.startup_mode = self.STARTUP_MODE_JOIN
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
         self.status_topic = status_topic
@@ -357,9 +372,11 @@ class ConnectDistributedService(ConnectServiceBase):
             self.start_and_wait_to_load_plugins(node, 'distributed', '')
         elif self.startup_mode == self.STARTUP_MODE_INSTANT:
             self.start_and_return_immediately(node, 'distributed', '')
+        elif self.startup_mode == self.STARTUP_MODE_LISTEN:
+            self.start_and_wait_to_start_listening(node, 'distributed', '')
         else:
             # The default mode is to wait until the complete startup of the worker
-            self.start_and_wait_to_start_listening(node, 'distributed', '')
+            self.start_and_wait_to_join_group(node, 'distributed', '')
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")