You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/11/06 21:59:28 UTC

[kafka] branch 1.1 updated: MINOR: Modify Connect service's startup timeout to be passed via the init (#5882)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 3a49247  MINOR: Modify Connect service's startup timeout to be passed via the init (#5882)
3a49247 is described below

commit 3a492474db20d5276dd96b54d557934a8d5bf312
Author: Magesh Nandakumar <ma...@gmail.com>
AuthorDate: Tue Nov 6 13:41:19 2018 -0800

    MINOR: Modify Connect service's startup timeout to be passed via the init (#5882)
    
    Currently, the startup timeout is hardcoded to be 60 seconds in Connect's test service. Modifying it to be passable via init. This can safely be backported as well.
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 tests/kafkatest/services/connect.py | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index d7ef204..69690da 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -62,12 +62,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, context, num_nodes, kafka, files):
+    def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
         super(ConnectServiceBase, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
         self.files = files
         self.startup_mode = self.STARTUP_MODE_LISTEN
+        self.startup_timeout_sec = startup_timeout_sec
         self.environment = {}
 
     def pids(self, node):
@@ -109,13 +110,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def start_and_wait_to_load_plugins(self, node, worker_type, remote_connector_configs):
         with node.account.monitor_log(self.LOG_FILE) as monitor:
             self.start_and_return_immediately(node, worker_type, remote_connector_configs)
-            monitor.wait_until('Kafka version', timeout_sec=60,
+            monitor.wait_until('Kafka version', timeout_sec=self.startup_timeout_sec,
                                err_msg="Never saw message indicating Kafka Connect finished startup on node: " +
                                        "%s in condition mode: %s" % (str(node.account), self.startup_mode))
 
     def start_and_wait_to_start_listening(self, node, worker_type, remote_connector_configs):
         self.start_and_return_immediately(node, worker_type, remote_connector_configs)
-        wait_until(lambda: self.listening(node), timeout_sec=60,
+        wait_until(lambda: self.listening(node), timeout_sec=self.startup_timeout_sec,
                    err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" %
                    (str(node.account), self.startup_mode))
 
@@ -128,7 +129,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             node.account.signal(pid, sig, allow_fail=True)
         if clean_shutdown:
             for pid in pids:
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Kafka Connect process on " + str(node.account) + " took too long to exit")
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=self.startup_timeout_sec, err_msg="Kafka Connect process on " + str(
+                    node.account) + " took too long to exit")
 
         node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
 
@@ -241,8 +243,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
 class ConnectStandaloneService(ConnectServiceBase):
     """Runs Kafka Connect in standalone mode."""
 
-    def __init__(self, context, kafka, files):
-        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files)
+    def __init__(self, context, kafka, files, startup_timeout_sec = 60):
+        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)
 
     # For convenience since this service only makes sense with a single node
     @property
@@ -288,8 +290,8 @@ class ConnectDistributedService(ConnectServiceBase):
     """Runs Kafka Connect in distributed mode."""
 
     def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
-                 configs_topic="connect-configs", status_topic="connect-status"):
-        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files)
+                 configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
+        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
         self.status_topic = status_topic