You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/07/20 14:00:12 UTC

[kafka] 02/02: KAFKA-10295: Wait for connector recovery in test_bounce (#9043)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7c0d87b6b3225778dd8e6a6f7c24983181d53a5d
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Mon Jul 20 06:50:05 2020 -0700

    KAFKA-10295: Wait for connector recovery in test_bounce (#9043)
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
---
 tests/kafkatest/tests/connect/connect_distributed_test.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 244bc64..107c0d4 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -387,8 +387,22 @@ class ConnectDistributedTest(Test):
                 # through the test.
                 time.sleep(15)
 
+        # Wait at least scheduled.rebalance.max.delay.ms to expire and rebalance
+        time.sleep(60)
 
+        # Allow the connectors to startup, recover, and exit cleanly before
+        # ending the test. It's possible for the source connector to make
+        # uncommitted progress, and for the sink connector to read messages that
+        # have not been committed yet, and fail a later assertion.
+        wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+        time.sleep(15)
         self.source.stop()
+        # Ensure that the sink connector has an opportunity to read all
+        # committed messages from the source connector.
+        wait_until(lambda: self.is_running(self.sink), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+        time.sleep(15)
         self.sink.stop()
         self.cc.stop()