You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/11/06 21:41:29 UTC
[kafka] branch trunk updated: MINOR: Modify Connect service's
startup timeout to be passed via the init (#5882)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1c5aec6 MINOR: Modify Connect service's startup timeout to be passed via the init (#5882)
1c5aec6 is described below
commit 1c5aec6e9d1df4c3552b8de783845f07bc482a84
Author: Magesh Nandakumar <ma...@gmail.com>
AuthorDate: Tue Nov 6 13:41:19 2018 -0800
MINOR: Modify Connect service's startup timeout to be passed via the init (#5882)
Currently, the startup timeout is hardcoded to be 60 seconds in Connect's test service. Modifying it to be passable via init. This can safely be backported as well.
Reviewers: Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
tests/kafkatest/services/connect.py | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index d8c8d5a..bf38e50 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -63,12 +63,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
"collect_default": True},
}
- def __init__(self, context, num_nodes, kafka, files):
+ def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
super(ConnectServiceBase, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
self.files = files
self.startup_mode = self.STARTUP_MODE_LISTEN
+ self.startup_timeout_sec = startup_timeout_sec
self.environment = {}
self.external_config_template_func = None
@@ -122,13 +123,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
def start_and_wait_to_load_plugins(self, node, worker_type, remote_connector_configs):
with node.account.monitor_log(self.LOG_FILE) as monitor:
self.start_and_return_immediately(node, worker_type, remote_connector_configs)
- monitor.wait_until('Kafka version', timeout_sec=60,
+ monitor.wait_until('Kafka version', timeout_sec=self.startup_timeout_sec,
err_msg="Never saw message indicating Kafka Connect finished startup on node: " +
"%s in condition mode: %s" % (str(node.account), self.startup_mode))
def start_and_wait_to_start_listening(self, node, worker_type, remote_connector_configs):
self.start_and_return_immediately(node, worker_type, remote_connector_configs)
- wait_until(lambda: self.listening(node), timeout_sec=60,
+ wait_until(lambda: self.listening(node), timeout_sec=self.startup_timeout_sec,
err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" %
(str(node.account), self.startup_mode))
@@ -141,7 +142,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
node.account.signal(pid, sig, allow_fail=True)
if clean_shutdown:
for pid in pids:
- wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Kafka Connect process on " + str(node.account) + " took too long to exit")
+ wait_until(lambda: not node.account.alive(pid), timeout_sec=self.startup_timeout_sec, err_msg="Kafka Connect process on " + str(
+ node.account) + " took too long to exit")
node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
@@ -254,8 +256,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
class ConnectStandaloneService(ConnectServiceBase):
"""Runs Kafka Connect in standalone mode."""
- def __init__(self, context, kafka, files):
- super(ConnectStandaloneService, self).__init__(context, 1, kafka, files)
+ def __init__(self, context, kafka, files, startup_timeout_sec = 60):
+ super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)
# For convenience since this service only makes sense with a single node
@property
@@ -303,8 +305,8 @@ class ConnectDistributedService(ConnectServiceBase):
"""Runs Kafka Connect in distributed mode."""
def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
- configs_topic="connect-configs", status_topic="connect-status"):
- super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files)
+ configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
+ super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
self.offsets_topic = offsets_topic
self.configs_topic = configs_topic
self.status_topic = status_topic