You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/04/26 15:21:04 UTC

[kafka] branch 1.0 updated: MINOR: Backport two system test changes for Connect to give more startup control (#6638)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new f763aa7  MINOR: Backport two system test changes for Connect to give more startup control (#6638)
f763aa7 is described below

commit f763aa7cc16bc3c9e31abf143fa71d29931e5ad9
Author: Cyrus Vafadari <cy...@alum.mit.edu>
AuthorDate: Fri Apr 26 08:20:46 2019 -0700

    MINOR: Backport two system test changes for Connect to give more startup control (#6638)
    
    This merge consists of two commits previously merged into later branches.
    Author: Cyrus Vafadari <cy...@confluent.io>
    Reviewers: Randall Hauch <rh...@gmail.com>
    
    Commit #1:
    MINOR: Add async and different sync startup modes in connect service test class
    
    Allow Connect Service in system tests to start asynchronously.
    
    Specifically, allow for three startup conditions:
    1. No condition - start async and return immediately.
    2. Semi-async - start immediately after plugins have been discovered successfully.
    3. Sync - start returns after the worker has completed startup. This is the current mode, but its condition is improved by checking that the port of Connect's REST interface is open, rather than that a log line has appeared in the logs.
    
    Author: Konstantine Karantasis <ko...@confluent.io>
    Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    Closes #4423 from kkonstantine/MINOR-Add-async-and-different-sync-startup-modes-in-ConnectService-test-class
    
    Commit #2:
    MINOR: Modify Connect service's startup timeout to be passed via the init (#5882)
    
    Currently, the startup timeout is hardcoded to be 60 seconds in Connect's test service. Modifying it to be passable via init.
    
    Author: Magesh Nandakumar <ma...@confluent.io>
    Reviewers: Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 tests/kafkatest/services/connect.py | 86 +++++++++++++++++++++++++++++--------
 1 file changed, 68 insertions(+), 18 deletions(-)

diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 399e53c..69690da 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -20,6 +20,7 @@ import signal
 import time
 
 import requests
+from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.errors import DucktapeError
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
@@ -39,6 +40,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     STDERR_FILE = os.path.join(PERSISTENT_ROOT, "connect.stderr")
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
+    CONNECT_REST_PORT = 8083
+
+    # Currently the Connect worker supports waiting on three modes:
+    STARTUP_MODE_INSTANT = 'INSTANT'
+    """STARTUP_MODE_INSTANT: Start Connect worker and return immediately"""
+    STARTUP_MODE_LOAD = 'LOAD'
+    """STARTUP_MODE_LOAD: Start Connect worker and return after discovering and loading plugins"""
+    STARTUP_MODE_LISTEN = 'LISTEN'
+    """STARTUP_MODE_LISTEN: Start Connect worker and return after opening the REST port."""
 
     logs = {
         "connect_log": {
@@ -52,11 +62,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, context, num_nodes, kafka, files):
+    def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
         super(ConnectServiceBase, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
         self.files = files
+        self.startup_mode = self.STARTUP_MODE_LISTEN
+        self.startup_timeout_sec = startup_timeout_sec
         self.environment = {}
 
     def pids(self, node):
@@ -76,6 +88,38 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         self.config_template_func = config_template_func
         self.connector_config_templates = connector_config_templates
 
+    def listening(self, node):
+        try:
+            cmd = "nc -z %s %s" % (node.account.hostname, self.CONNECT_REST_PORT)
+            node.account.ssh_output(cmd, allow_fail=False)
+            self.logger.debug("Connect worker started accepting connections at: '%s:%s')", node.account.hostname,
+                              self.CONNECT_REST_PORT)
+            return True
+        except (RemoteCommandError, ValueError) as e:
+            return False
+
+    def start(self, mode=STARTUP_MODE_LISTEN):
+        self.startup_mode = mode
+        super(ConnectServiceBase, self).start()
+
+    def start_and_return_immediately(self, node, worker_type, remote_connector_configs):
+        cmd = self.start_cmd(node, remote_connector_configs)
+        self.logger.debug("Connect %s command: %s", worker_type, cmd)
+        node.account.ssh(cmd)
+
+    def start_and_wait_to_load_plugins(self, node, worker_type, remote_connector_configs):
+        with node.account.monitor_log(self.LOG_FILE) as monitor:
+            self.start_and_return_immediately(node, worker_type, remote_connector_configs)
+            monitor.wait_until('Kafka version', timeout_sec=self.startup_timeout_sec,
+                               err_msg="Never saw message indicating Kafka Connect finished startup on node: " +
+                                       "%s in condition mode: %s" % (str(node.account), self.startup_mode))
+
+    def start_and_wait_to_start_listening(self, node, worker_type, remote_connector_configs):
+        self.start_and_return_immediately(node, worker_type, remote_connector_configs)
+        wait_until(lambda: self.listening(node), timeout_sec=self.startup_timeout_sec,
+                   err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" %
+                   (str(node.account), self.startup_mode))
+
     def stop_node(self, node, clean_shutdown=True):
         self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account))
         pids = self.pids(node)
@@ -85,7 +129,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             node.account.signal(pid, sig, allow_fail=True)
         if clean_shutdown:
             for pid in pids:
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Kafka Connect process on " + str(node.account) + " took too long to exit")
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=self.startup_timeout_sec, err_msg="Kafka Connect process on " + str(
+                    node.account) + " took too long to exit")
 
         node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
 
@@ -192,14 +237,14 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         raise exception_to_throw
 
     def _base_url(self, node):
-        return 'http://' + node.account.externally_routable_ip + ':' + '8083'
+        return 'http://' + node.account.externally_routable_ip + ':' + str(self.CONNECT_REST_PORT)
 
 
 class ConnectStandaloneService(ConnectServiceBase):
     """Runs Kafka Connect in standalone mode."""
 
-    def __init__(self, context, kafka, files):
-        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files)
+    def __init__(self, context, kafka, files, startup_timeout_sec = 60):
+        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)
 
     # For convenience since this service only makes sense with a single node
     @property
@@ -229,11 +274,13 @@ class ConnectStandaloneService(ConnectServiceBase):
             remote_connector_configs.append(target_file)
 
         self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
-        with node.account.monitor_log(self.LOG_FILE) as monitor:
-            cmd = self.start_cmd(node, remote_connector_configs)
-            self.logger.debug("Connect standalone command: %s", cmd)
-            node.account.ssh(cmd)
-            monitor.wait_until('Kafka Connect started', timeout_sec=60, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
+        if self.startup_mode == self.STARTUP_MODE_LOAD:
+            self.start_and_wait_to_load_plugins(node, 'standalone', remote_connector_configs)
+        elif self.startup_mode == self.STARTUP_MODE_INSTANT:
+            self.start_and_return_immediately(node, 'standalone', remote_connector_configs)
+        else:
+            # The default mode is to wait until the complete startup of the worker
+            self.start_and_wait_to_start_listening(node, 'standalone', remote_connector_configs)
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
@@ -243,13 +290,14 @@ class ConnectDistributedService(ConnectServiceBase):
     """Runs Kafka Connect in distributed mode."""
 
     def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
-                 configs_topic="connect-configs", status_topic="connect-status"):
-        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files)
+                 configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
+        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
         self.status_topic = status_topic
 
-    def start_cmd(self, node):
+    # connector_configs argument is intentionally ignored in distributed service.
+    def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
         cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
         for envvar in self.environment:
@@ -268,11 +316,13 @@ class ConnectDistributedService(ConnectServiceBase):
             raise DucktapeError("Config files are not valid in distributed mode, submit connectors via the REST API")
 
         self.logger.info("Starting Kafka Connect distributed process on " + str(node.account))
-        with node.account.monitor_log(self.LOG_FILE) as monitor:
-            cmd = self.start_cmd(node)
-            self.logger.debug("Connect distributed command: %s", cmd)
-            node.account.ssh(cmd)
-            monitor.wait_until('Kafka Connect started', timeout_sec=60, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
+        if self.startup_mode == self.STARTUP_MODE_LOAD:
+            self.start_and_wait_to_load_plugins(node, 'distributed', '')
+        elif self.startup_mode == self.STARTUP_MODE_INSTANT:
+            self.start_and_return_immediately(node, 'distributed', '')
+        else:
+            # The default mode is to wait until the complete startup of the worker
+            self.start_and_wait_to_start_listening(node, 'distributed', '')
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")