You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/04 20:07:19 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

vvcephei commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r419693644



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",

Review comment:
       Thanks @cadonna ; I agree. This test should just be verifying that we first converge on 7, and then that we converge on 8.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -305,11 +305,11 @@ def test_version_probing_upgrade(self):
         self.driver.disable_auto_terminate()
         # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
         self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
-        self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor")
+        self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor")

Review comment:
       We can actually just delete these lines now.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       I know that this check was here in some fashion before, but I'm drawing a blank on why we need to verify this log line. It seems like _just_ checking the version number logs and nothing else would be the key to a long and happy life.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org