You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/08 00:06:34 UTC
[kafka] 02/02: MINOR: Fix race condition on shutdown of verifiable
producer
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 91d229df6011045f92734c3e059888a3d60eb47e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Jun 7 16:56:21 2019 -0700
MINOR: Fix race condition on shutdown of verifiable producer
We've seen `ReplicaVerificationToolTest.test_replica_lags` fail occasionally due to errors such as the following:
```
RemoteCommandError: ubuntuworker7: Command 'kill -15 2896' returned non-zero exit status 1. Remote error message: bash: line 0: kill: (2896) - No such process
```
The problem seems to be a shutdown race condition when using `max_messages` with the producer. The process may already be gone which will cause the signal to fail.
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Gwen Shapira
Closes #6906 from hachikuji/fix-failing-replicat-verification-test
---
tests/kafkatest/services/verifiable_producer.py | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 3322d16..893baa4 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -278,7 +278,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
return True
def stop_node(self, node):
- self.kill_node(node, clean_shutdown=True, allow_fail=False)
+ # There is a race condition on shutdown if using `max_messages` since the
+ # VerifiableProducer will shutdown automatically when all messages have been
+ # written. In this case, the process will be gone and the signal will fail.
+ allow_fail = self.max_messages > 0
+ self.kill_node(node, clean_shutdown=True, allow_fail=allow_fail)
stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \