You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/08 00:06:34 UTC

[kafka] 02/02: MINOR: Fix race condition on shutdown of verifiable producer

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 91d229df6011045f92734c3e059888a3d60eb47e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Jun 7 16:56:21 2019 -0700

    MINOR: Fix race condition on shutdown of verifiable producer
    
    We've seen `ReplicaVerificationToolTest.test_replica_lags` fail occasionally due to errors such as the following:
    ```
    RemoteCommandError: ubuntuworker7: Command 'kill -15 2896' returned non-zero exit status 1. Remote error message: bash: line 0: kill: (2896) - No such process
    ```
    The problem seems to be a shutdown race condition when using `max_messages` with the producer. The process may already be gone which will cause the signal to fail.
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Gwen Shapira
    
    Closes #6906 from hachikuji/fix-failing-replicat-verification-test
---
 tests/kafkatest/services/verifiable_producer.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 3322d16..893baa4 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -278,7 +278,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
             return True
 
     def stop_node(self, node):
-        self.kill_node(node, clean_shutdown=True, allow_fail=False)
+        # There is a race condition on shutdown if using `max_messages` since the
+        # VerifiableProducer will shutdown automatically when all messages have been
+        # written. In this case, the process will be gone and the signal will fail.
+        allow_fail = self.max_messages > 0
+        self.kill_node(node, clean_shutdown=True, allow_fail=allow_fail)
 
         stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
         assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \