You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/04 14:44:28 UTC

[GitHub] [kafka] cadonna opened a new pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

cadonna opened a new pull request #8613:
URL: https://github.com/apache/kafka/pull/8613


   This PR sets HighAvailabilityTaskAssignor as default task assignor in
   streams_upgrade_test.py. The verification of the test needed to be
   modified to because the HighAvailabilityTaskAssignor surfaced a flakiness
   in the test. More precisely, the verifications assume that the last
   client that is bounced joins the group before the other two clients
   are able to rebalance without the last client. This assumption does not
   always hold.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-624218014


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r419496703



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",

Review comment:
       This verification is only true if the two other processors haven't rebalanced before the processor that bounced last re-joins the group. If the rebalance occurs, the commonly supported version is already at 8 when the last processor joins.
   
   Actually, the test `test_version_probing_upgrade` is independent of the used task assignor, but this issue was surfaced by the `HighAvailabilityTaskAssignor` but not by the `StickyTaskAssignor`. I cannot say for sure why.
   
   IMO, removing this verification should be OK, since afterwards we check whether the processors have synchronized generations which means that all three processors successfully joined the group in the end. The state that we do not explicitly verify anymore is the transient state where version 7 is currently used, but all processor are able to use version 8.      




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       I guess what we really would need is a way to check if a group stabilized. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned, because in that specific rebalance the corresponding partitions were revoked from the other processors. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned).   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-623520934


   Call for review: @vvcephei @ableegoldman 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-625429232


   System test passed again: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-05-05--001.1588709838--cadonna--fix_version_probing_system_test--b80ef74/report.html
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       We can leave it because it verifies whether the assignment was triggered in the assignor, which is better than nothing. However, it does not give us any guarantee that the rebalance took actually place. 
   
   I guess what we really would need is a way to check if a group stabilized and if the assignment is valid. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processors had the same generation, but one processor did not have any tasks assigned. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned).   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       We can leave it because it verifies whether the assignment was triggered in the assignor. However, it does not give us any guarantee that the rebalance took actually place. 
   
   I guess what we really would need is a way to check if a group stabilized. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned, because in that specific rebalance the corresponding partitions were revoked from the other processors. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned).   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-623525749


   System tests job: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3928/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       We can leave it because it verifies whether the assignment was triggered in the assignor. However, it does not give us any guarantee that the rebalance took actually place. 
   
   I guess what we really would need is a way to check if a group stabilized and if the assignment is valid. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned, because in that specific rebalance the corresponding partitions were revoked from the other processors. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned).   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r419829020



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       I think the idea is to verify that the actual version probing rebalance takes place, ie that the partition assignor actually handles the version probing once it's detected. And that it signals to the stream thread which also handles it correctly in turn. But idk -- I've probably broken and fixed the version probing test 2 or 3 times now due to this one line in particular.
   
   So, I'd be happy to see it go. I probably have too much bad history to make an unbiased call here though 😄 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-624258204


   Just in case, I re-run the system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3929/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       We can leave it because it verifies whether the assignment was triggered in the assignor. However, it does not give us any guarantee that the rebalance took actually place. 
   
   I guess what we really would need is a way to check if a group stabilized and if the assignment is valid. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned).   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420306701



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       Thanks, all. This doesn't seem like the best way to verify what we're trying to verify, but it also seems about the same as before. I'm happy to leave this here for now.
   
   If/when the test breaks again, I'd prefer for us to put in a more reliable and direct mechanism.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       I guess what we really would need is a way to check if a group stabilized. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned, because in that specific rebalance the corresponding partitions were revoked from the other processors. So we would actually need to check if they have the highest generation is in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned).   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r419693644



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",

Review comment:
       Thanks @cadonna ; I agree. This test should just be verifying that we first converge on 7, and then that we converge on 8.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -305,11 +305,11 @@ def test_version_probing_upgrade(self):
         self.driver.disable_auto_terminate()
         # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
         self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
-        self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor")
+        self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor")

Review comment:
       We can actually just delete these lines now.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       I know that this check was here in some fashion before, but I'm drawing a blank on why we need to verify this log line. It seems like _just_ checking the version number logs and nothing else would be the key to a long and happy life.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-625589181


   Actually, since the only thing that changed was a python system test file, it couldn't cause any of the integration test failures, so I'll go ahead and merge.
   
   Here were the failures:
       org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
       org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
       org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
   
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-624270635


   Thanks @cadonna , Let's see how those tests play out.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-625429514


   The test results are gone now, unfortunately.
   
   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r419496703



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",

Review comment:
       This verification is only true if the two other processors haven't rebalanced before the processor that bounced last re-joins the group. If the rebalance occurs, the commonly supported version is already at 8 when the last processor joins. The occurrence of the rebalance depends also on how long the log files movements on lines 507-509 last. 
   
   Actually, the test `test_version_probing_upgrade` is independent of the used task assignor, but this issue was surfaced by the `HighAvailabilityTaskAssignor` but not by the `StickyTaskAssignor`. I cannot say for sure why.
   
   IMO, removing this verification should be OK, since afterwards we check whether the processors have synchronized generations which means that all three processors successfully joined the group in the end. The state that we do not explicitly verify anymore is the transient state where version 7 is currently used, but all processor are able to use version 8.      




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r419496703



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",

Review comment:
       This verification is only true if the two other processors haven't rebalanced before the processor that bounced last re-joins the group. If this rebalance happens or not depends also on how long the log files movements on lines 507-509 last.
   Actually, the test `test_version_probing_upgrade` is independent of the used task assignor, but this issue was surfaced by the `HighAvailabilityTaskAssignor` but not by the `StickyTaskAssignor`. I cannot say for sure why. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r419829020



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                    else:
-                        log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
+                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",

Review comment:
       I think the idea is to verify that the actual version probing rebalance takes place, ie that the partition assignor actually handles the version probing once it's detected. And that it signals to the stream thread which also handles it correctly in turn. But idk -- I've probably broken and fixed the version probing test 2 or 3 times now due to this one line in particular so I'd be happy to see it go. I probably have too much bad history to make an unbiased call here though 😄 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org