You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/08/27 06:29:31 UTC

[kafka] branch trunk updated: KAFKA-13231; `TransactionalMessageCopier.start_node` should wait until the process if fully started (#11264)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c4e1e23  KAFKA-13231; `TransactionalMessageCopier.start_node` should wait until the process if fully started (#11264)
c4e1e23 is described below

commit c4e1e238574e24aac7ee80f89fb6cb25ecf97ef3
Author: David Jacot <dj...@confluent.io>
AuthorDate: Fri Aug 27 08:28:14 2021 +0200

    KAFKA-13231; `TransactionalMessageCopier.start_node` should wait until the process if fully started (#11264)
    
    This patch ensures that the transaction message copier is fully started in `start_node`. Without this, it is possible that `stop_node` is called before the process is started which results in not stopping it at all.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 tests/kafkatest/services/transactional_message_copier.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py
index bea86a6..0717463 100644
--- a/tests/kafkatest/services/transactional_message_copier.py
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -159,12 +159,16 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
     def alive(self, node):
         return len(self.pids(node)) > 0
 
+    def start_node(self, node):
+        BackgroundThreadService.start_node(self, node)
+        wait_until(lambda: self.alive(node), timeout_sec=60, err_msg="Node %s: Message Copier failed to start" % str(node.account))
+
     def kill_node(self, node, clean_shutdown=True):
         pids = self.pids(node)
         sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
         for pid in pids:
             node.account.signal(pid, sig)
-            wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Message Copier failed to stop")
+        wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Node %s: Message Copier failed to stop" % str(node.account))
 
     def stop_node(self, node, clean_shutdown=True):
         self.kill_node(node, clean_shutdown)