You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/07/21 19:12:36 UTC

[kafka] branch trunk updated: MINOR: fix flaky test test_standby_tasks_rebalance (#12428)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5e4ae06d12 MINOR: fix flaky test test_standby_tasks_rebalance (#12428)
5e4ae06d12 is described below

commit 5e4ae06d129f01d94ed27f73ea311a2adb0477e7
Author: Hao Li <11...@users.noreply.github.com>
AuthorDate: Thu Jul 21 12:12:29 2022 -0700

    MINOR: fix flaky test test_standby_tasks_rebalance (#12428)
    
    * Description
    In this test, when third proc join, sometimes there are other rebalance scenarios such as followup joingroup request happens before syncgroup response was received by one of the proc and the previously assigned tasks for that proc is then lost during new joingroup request. This can result in standby tasks assigned as 3, 1, 2. This PR relax the expected assignment of 2, 2, 2 to a range of [1-3].
    
    * Some backgroud from Guozhang:
    I talked to @hao Li offline and also inspected the code a bit, and tl;dr is that I think the code logic is correct (i.e. we do not really have a bug), but we need to relax the test verification a little bit. The general idea behind the subscription info is that:
    
    When a client joins the group, its subscription will try to encode all its current assigned active and standby tasks, which would be used as prev active and standby tasks by the assignor in order to achieve some stickiness.
    
    When a client drops all its active/standby tasks due to errors, it does not actually report all empty from its subscription, instead it tries to check its local state directory (you can see that from TaskManager#getTaskOffsetSums which populates the taskOffsetSum. For active task, its offset would be “-2” a.k.a. LATEST_OFFSET, for standby task, its offset is an actual numerical number.
    
    So in this case, the proc2 which drops all its active and standby tasks, would still report all tasks that have some local state still, and since it was previously owning all six tasks (three as active, and three as standby), it would report all six as standbys, and when that happens the resulted assignment as @hao Li verified, is indeed the un-even one.
    
    So I think the actual “issue“ happens here, is when proc2 is a bit late sending the sync-group request, when the previous rebalance has already completed, and a follow-up rebalance has already triggered, in that case, the resulted un-even assignment is indeed expected. Such a scenario, though not common, is still legitimate since in practice all kinds of timing skewness across instances can happen. So I think we should just relax our verification here, i.e. just making sure that each  [...]
    
    Reviewers: Suhas Satish <ss...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../tests/streams/streams_standby_replica_test.py     | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index a8c07513c1..c0e5953f73 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -73,9 +73,9 @@ class StreamsStandbyTask(BaseStreamsTest):
 
         processor_3.start()
 
-        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE)
-        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE)
-        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE)
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE)
 
         processor_1.stop()
 
@@ -93,9 +93,9 @@ class StreamsStandbyTask(BaseStreamsTest):
 
         processor_2.start()
 
-        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE)
-        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE)
-        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE, num_lines=2)
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE, num_lines=2)
 
         processor_3.stop()
 
@@ -112,10 +112,9 @@ class StreamsStandbyTask(BaseStreamsTest):
         self.wait_for_verification(processor_2, "ACTIVE_TASKS:3 STANDBY_TASKS:3", processor_2.STDOUT_FILE, num_lines=2)
 
         processor_1.start()
-
-        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE)
-        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE)
-        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE, num_lines=2)
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE, num_lines=2)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE)
 
         self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1,
                             self.streams_sink_topic_1, self.num_messages)