You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/08/27 06:29:31 UTC
[kafka] branch trunk updated: KAFKA-13231;
`TransactionalMessageCopier.start_node` should wait until the
process if fully started (#11264)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c4e1e23 KAFKA-13231; `TransactionalMessageCopier.start_node` should wait until the process if fully started (#11264)
c4e1e23 is described below
commit c4e1e238574e24aac7ee80f89fb6cb25ecf97ef3
Author: David Jacot <dj...@confluent.io>
AuthorDate: Fri Aug 27 08:28:14 2021 +0200
KAFKA-13231; `TransactionalMessageCopier.start_node` should wait until the process if fully started (#11264)
This patch ensures that the transaction message copier is fully started in `start_node`. Without this, it is possible that `stop_node` is called before the process is started which results in not stopping it at all.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
tests/kafkatest/services/transactional_message_copier.py | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py
index bea86a6..0717463 100644
--- a/tests/kafkatest/services/transactional_message_copier.py
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -159,12 +159,16 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
def alive(self, node):
return len(self.pids(node)) > 0
+ def start_node(self, node):
+ BackgroundThreadService.start_node(self, node)
+ wait_until(lambda: self.alive(node), timeout_sec=60, err_msg="Node %s: Message Copier failed to start" % str(node.account))
+
def kill_node(self, node, clean_shutdown=True):
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
node.account.signal(pid, sig)
- wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Message Copier failed to stop")
+ wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Node %s: Message Copier failed to stop" % str(node.account))
def stop_node(self, node, clean_shutdown=True):
self.kill_node(node, clean_shutdown)