You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/15 10:33:09 UTC

[GitHub] [kafka] tim-patterson opened a new pull request #11760: Kafka 13600 Kafka Streams - Fall back to most caught up client if no caught up clients exist

tim-patterson opened a new pull request #11760:
URL: https://github.com/apache/kafka/pull/11760


   Unit test for case task assignment where no caught up nodes exist.
   Existing unit and integration tests to verify no other behaviour has been changed
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1080743307


   Streams system tests are green. Going to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1048659159


   The most recent system test run on trunk I could find on Jenkins was one on February 9th. This run does not contain the system test failures I mentioned above (http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-02-21--001.system-test-kafka-branch-builder--1645479435--tim-patterson--KAFKA-13600-2--7b1e8797de/report.html).
   
   @tim-patterson Can you consistently reproduce the failures locally on trunk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1049161045


   > The most recent [system test run on trunk](http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2022-02-09--001.system-test-kafka-trunk--1644413755--confluentinc--master--753251b0f6/report.html) I could find on Jenkins was one on February 9th. This run does not contain the system test failures [I mentioned above](http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-02-21--001.system-test-kafka-branch-builder--1645479435--tim-patterson--KAFKA-13600-2--7b1e8797de/report.html).
   > 
   > @tim-patterson Can you consistently reproduce the failures locally on trunk?
   
   Yes I seem to be able to, they're all timeout failures so maybe the extra latency of waiting for acks=all is enough to tip it over a timeout locally...
   
   So here's a trunk commit from yesterday
   
   ```
   tpatterson@Tims-MacBook-Pro-2 kafka % git status
   On branch trunk
   Your branch is up to date with 'origin/trunk'.
   
   nothing to commit, working tree clean
   tpatterson@Tims-MacBook-Pro-2 kafka % git log --oneline | head -n2
   a5bb45c11a KAFKA-13511: Add support for different unix precisions in TimestampConverter SMT (#11575)
   576496a1ca MINOR: Improve Connect docs (#11642)
   
   tpatterson@Tims-MacBook-Pro-2 kafka % tests/docker/ducker-ak down && ./gradlew clean && REBUILD="t" TC_PATHS="tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance" _DUCKTAPE_OPTIONS='--parameters '\''{"upgrade_from_version":"0.10.0.1"}'\' bash tests/docker/run_tests.sh
   
   ...
   
   ================================================================================
   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.1
   session_id:       2022-02-23--001
   run time:         2 minutes 21.028 seconds
   tests run:        1
   passed:           0
   failed:           1
   ignored:          0
   ================================================================================
   test_id:    kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test.StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance.upgrade_from_version=0.10.0.1
   status:     FAIL
   run time:   2 minutes 20.921 seconds
   
   
       TimeoutError("Never saw 'Processed [0-9]* records so far' message ducker@ducker07")
   Traceback (most recent call last):
     File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 133, in run
       data = self.run_test()
     File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 190, in run_test
       return self.test_context.function(self.test)
     File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 429, in wrapper
       return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
     File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py", line 93, in test_upgrade_to_cooperative_rebalance
       verify_running(processor, self.processing_message)
     File "/opt/kafka-dev/tests/kafkatest/tests/streams/utils/util.py", line 19, in verify_running
       monitor.wait_until(message,
     File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", line 707, in wait_until
       return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % (self.offset + 1, self.log, pattern),
     File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 41, in wait_until
       raise TimeoutError(err_msg() if callable(err_msg) else err_msg)
   ducktape.errors.TimeoutError: Never saw 'Processed [0-9]* records so far' message ducker@ducker07
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1040158233


   Thank you for the PR @tim-patterson ! I going to have a look at it tomorrow!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r811184710



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            UUID sourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
                 c -> clientStates.get(c).hasStandbyTask(movement.task)
             );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
+
+            if (sourceClient == null) {
+                sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+            }
+
+            if (sourceClient == null) {
+                sourceClient = requireNonNull(
+                        mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
+                        "Tried to move task to more caught-up client but none exist"
                 );
+            }
 
+            if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
+                // there's not a standby available to take over the task, so we'll schedule a warmup instead

Review comment:
       I think it is fine as you did. However, the method is already over 100 lines long. Maybe the code would benefit from a bit of refactoring. The method was already quite long before you added your part. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson edited a comment on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson edited a comment on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1049161045


   > The most recent [system test run on trunk](http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2022-02-09--001.system-test-kafka-trunk--1644413755--confluentinc--master--753251b0f6/report.html) I could find on Jenkins was one on February 9th. This run does not contain the system test failures [I mentioned above](http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-02-21--001.system-test-kafka-branch-builder--1645479435--tim-patterson--KAFKA-13600-2--7b1e8797de/report.html).
   > 
   > @tim-patterson Can you consistently reproduce the failures locally on trunk?
   
   Yes I seem to be able to, they're all timeout failures so maybe the extra latency of waiting for acks=all is enough to tip it over a timeout locally... Is there anyway to see what commit that system test from trunk actually ran on?
   
   So here's a trunk commit from yesterday
   
   ```
   tpatterson@Tims-MacBook-Pro-2 kafka % git status
   On branch trunk
   Your branch is up to date with 'origin/trunk'.
   
   nothing to commit, working tree clean
   tpatterson@Tims-MacBook-Pro-2 kafka % git log --oneline | head -n2
   a5bb45c11a KAFKA-13511: Add support for different unix precisions in TimestampConverter SMT (#11575)
   576496a1ca MINOR: Improve Connect docs (#11642)
   
   tpatterson@Tims-MacBook-Pro-2 kafka % tests/docker/ducker-ak down && ./gradlew clean && REBUILD="t" TC_PATHS="tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance" _DUCKTAPE_OPTIONS='--parameters '\''{"upgrade_from_version":"0.10.0.1"}'\' bash tests/docker/run_tests.sh
   
   ...
   
   ================================================================================
   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.1
   session_id:       2022-02-23--001
   run time:         2 minutes 21.028 seconds
   tests run:        1
   passed:           0
   failed:           1
   ignored:          0
   ================================================================================
   test_id:    kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test.StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance.upgrade_from_version=0.10.0.1
   status:     FAIL
   run time:   2 minutes 20.921 seconds
   
   
       TimeoutError("Never saw 'Processed [0-9]* records so far' message ducker@ducker07")
   Traceback (most recent call last):
     File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 133, in run
       data = self.run_test()
     File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 190, in run_test
       return self.test_context.function(self.test)
     File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 429, in wrapper
       return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
     File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py", line 93, in test_upgrade_to_cooperative_rebalance
       verify_running(processor, self.processing_message)
     File "/opt/kafka-dev/tests/kafkatest/tests/streams/utils/util.py", line 19, in verify_running
       monitor.wait_until(message,
     File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", line 707, in wait_until
       return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % (self.offset + 1, self.log, pattern),
     File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 41, in wait_until
       raise TimeoutError(err_msg() if callable(err_msg) else err_msg)
   ducktape.errors.TimeoutError: Never saw 'Processed [0-9]* records so far' message ducker@ducker07
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1048281241


   Thanks @cadonna 
   Those tests(Well at least one of them) seem to fail on trunk too.
   
   I've done some digging
   The tests broke on `34208e` (22nd jan) due to a typo,
   But once the typo was fixed `44fcba` (9th feb), the real test failures started.
   
   By doing a git bisect but checking out checking out the dockerfile with the typo fixed it looks like
   `e6db0c KAFKA-13598: enable idempotence producer by default and validate the configs`
   Is the commit breaking these tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1049350759


   @tim-patterson @cadonna , yes, we are aware of the failed system tests because someone reported last week: https://github.com/apache/kafka/pull/11769. And yes, we have a PR (https://github.com/apache/kafka/pull/11788) ready to fix it. Thanks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r808418617



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            UUID sourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
                 c -> clientStates.get(c).hasStandbyTask(movement.task)
             );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
+
+            if (sourceClient == null) {
+                sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+            }
+
+            if (sourceClient == null) {
+                sourceClient = requireNonNull(
+                        mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
+                        "Tried to move task to more caught-up client but none exist"
                 );
+            }
 
+            if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
+                // there's not a standby available to take over the task, so we'll schedule a warmup instead

Review comment:
       I sort of originally did something like that , but a unit test caused a runtime assertion.
   
   Basically there's ~5 cases.
   1. Caught up client with standby
   2. Caught up client without standby
   3. Not Caught up client with standby
   4. Not Caught up client without standby
   5. No client found (runtime assertion).
   
   So the `sourceClient ` from `mostCaughtUpEligibleClient(...)` may actually need to call `swapStandbyAndActive(...)` depending on if it's actually got a standby assigned to it.
   
   I guess can do something like
   ```
   if (sourceClient != null) {
       swapStandbyAndActive(...)
   }  else {
       sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
       if (sourceClient != null) {
           moveActiveAndTryToWarmUp(...)
       } else {
           sourceClient = requireNonNull(
               mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
               "Tried to move task to more caught-up client but none exist"
           );
           if (clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
               swapStandbyAndActive(...)
           } else {
               moveActiveAndTryToWarmUp(...)
           }
       }
   }
   caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1047769524


   I run the Streams system tests on this branch twice yesterday and got twice the same failures.
   
   See http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-02-21--001.system-test-kafka-branch-builder--1645479435--tim-patterson--KAFKA-13600-2--7b1e8797de/report.html
    
   It might also be the tests, but it requires investigation.
   
   To run the system tests locally follow these instructions:
   https://github.com/apache/kafka/blob/trunk/tests/README.md


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r811393001



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            UUID sourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
                 c -> clientStates.get(c).hasStandbyTask(movement.task)
             );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
+
+            if (sourceClient == null) {
+                sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+            }
+
+            if (sourceClient == null) {
+                sourceClient = requireNonNull(
+                        mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
+                        "Tried to move task to more caught-up client but none exist"
                 );
+            }
 
+            if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
+                // there's not a standby available to take over the task, so we'll schedule a warmup instead

Review comment:
       Sure let me have a bit of a play around.
   All these methods being static means that there's a lot of state/method arguments to pass to each method call.
   So I'm unsure about how much of a win we're going to get extracting smaller bits of code out into separate methods.
   Maybe some closures/local functions might help....
   I'll have a bit of a play and get back to you :)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r833117940



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,45 +108,27 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
-                movement.task,
-                c -> clientStates.get(c).hasStandbyTask(movement.task)
-            );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
-                );
-
-                moveActiveAndTryToWarmUp(
-                    remainingWarmupReplicas,
-                    movement.task,
-                    clientStates.get(sourceClient),
-                    clientStates.get(movement.destination),
-                    warmups.computeIfAbsent(movement.destination, x -> new TreeSet<>())
-                );
-                caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
-            } else {
-                // we found a candidate to trade standby/active state with our destination, so we don't need a warmup
-                swapStandbyAndActive(
-                    movement.task,
-                    clientStates.get(standbySourceClient),
-                    clientStates.get(movement.destination)
-                );
-                caughtUpClientsByTaskLoad.offerAll(asList(standbySourceClient, movement.destination));
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            final boolean moved = tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, caughtUpClientsByTaskLoad, movement) ||
+                    tryToMoveActiveToCaughtUpClientAndTryToWarmUp(clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement) ||
+                    tryToMoveActiveToMostCaughtUpClient(tasksToClientByLag, clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement);
+
+            if (!moved) {
+                throw new IllegalStateException("Tried to move task to more caught-up client but none exist");

Review comment:
       ```suggestion
                   throw new IllegalStateException("Tried to move task to more caught-up client as scheduled before but none exist");
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,45 +108,27 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
-                movement.task,
-                c -> clientStates.get(c).hasStandbyTask(movement.task)
-            );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
-                );
-
-                moveActiveAndTryToWarmUp(
-                    remainingWarmupReplicas,
-                    movement.task,
-                    clientStates.get(sourceClient),
-                    clientStates.get(movement.destination),
-                    warmups.computeIfAbsent(movement.destination, x -> new TreeSet<>())
-                );
-                caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
-            } else {
-                // we found a candidate to trade standby/active state with our destination, so we don't need a warmup
-                swapStandbyAndActive(
-                    movement.task,
-                    clientStates.get(standbySourceClient),
-                    clientStates.get(movement.destination)
-                );
-                caughtUpClientsByTaskLoad.offerAll(asList(standbySourceClient, movement.destination));
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            final boolean moved = tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, caughtUpClientsByTaskLoad, movement) ||
+                    tryToMoveActiveToCaughtUpClientAndTryToWarmUp(clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement) ||
+                    tryToMoveActiveToMostCaughtUpClient(tasksToClientByLag, clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement);

Review comment:
       Could you add tests to `TaskMovementTest` for case `tryToMoveActiveToMostCaughtUpClient()` that you added?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r815521489



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -56,25 +54,34 @@ private int numCaughtUpClients() {
         return caughtUpClients.size();
     }
 
-    private static boolean taskIsNotCaughtUpOnClientAndOtherCaughtUpClientsExist(final TaskId task,
-                                                                                 final UUID client,
-                                                                                 final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
-        return !taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
+    private static boolean taskIsNotCaughtUpOnClientAndOtherMoreCaughtUpClientsExist(final TaskId task,
+                                                                                     final UUID client,
+                                                                                     final Map<UUID, ClientState> clientStates,
+                                                                                     final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+                                                                                     final Map<TaskId, List<UUID>> tasksToClientByLag) {

Review comment:
       Sure I've pushed up a commit that does this.
   My only concern is that these `SortedSet`'s now end up holding a reference to `clientStates` rather than just being a plain old list/set etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r808913458



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            UUID sourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
                 c -> clientStates.get(c).hasStandbyTask(movement.task)
             );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
+
+            if (sourceClient == null) {
+                sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+            }
+
+            if (sourceClient == null) {
+                sourceClient = requireNonNull(
+                        mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
+                        "Tried to move task to more caught-up client but none exist"
                 );
+            }
 
+            if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
+                // there's not a standby available to take over the task, so we'll schedule a warmup instead

Review comment:
       We could even make it more explicit by introducing methods like:
   ```
   private boolean tryToSwapStandbyAndActiveOnCaughtUpClient(...);
   private boolean tryToMoveToCaughtUpClientAndTryToWarmUp(...);
   private boolean tryToSwapStandbyAndActiveOnMostCaughtUpClient(...);
   private boolean tryToMoveToMostCaughtUpClientAndTryToWarmUp(...);
   ```
   But that is optional and up to you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1080760269


   Cherry-picked to 3.2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11760: Kafka 13600 Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1040111496


   @vvcephei


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1048522382


   Thanks a lot for the investigation! 
   Before I wrote my last comment I had a quick look at other system test runs on trunk and thought they were passing, but apparently I had a too quick look and missed something.
   
   @showuon Are you aware of any issues with your PR mentioned above?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1067231809


   Hi @cadonna,
   Sorry day job stuff got in the way for a bit.
   I've merged in trunk and the streams integration tests all run locally for me now.
   
   I also split up that big method with a few helpers, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1080135755


   @cadonna Hey Bruno, how's this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1049589030


   Thank you both for clearing this up! I also run the system tests on trunk and got the same failures.
   http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2022-02-23--001.system-test-kafka-branch-builder--1645642736--apache--trunk--6f09c3f88b/report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r811394037



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            UUID sourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
                 c -> clientStates.get(c).hasStandbyTask(movement.task)
             );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
+
+            if (sourceClient == null) {
+                sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+            }
+
+            if (sourceClient == null) {
+                sourceClient = requireNonNull(
+                        mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
+                        "Tried to move task to more caught-up client but none exist"
                 );
+            }
 
+            if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
+                // there's not a standby available to take over the task, so we'll schedule a warmup instead

Review comment:
       That sounds great! Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r811402366



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -56,25 +54,34 @@ private int numCaughtUpClients() {
         return caughtUpClients.size();
     }
 
-    private static boolean taskIsNotCaughtUpOnClientAndOtherCaughtUpClientsExist(final TaskId task,
-                                                                                 final UUID client,
-                                                                                 final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
-        return !taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
+    private static boolean taskIsNotCaughtUpOnClientAndOtherMoreCaughtUpClientsExist(final TaskId task,
+                                                                                     final UUID client,
+                                                                                     final Map<UUID, ClientState> clientStates,
+                                                                                     final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+                                                                                     final Map<TaskId, List<UUID>> tasksToClientByLag) {

Review comment:
       Would it be possible to use a `SortedSet` instead of a `List` here. Would make it clearer that client IDs should only appear once in this list. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -56,25 +54,34 @@ private int numCaughtUpClients() {
         return caughtUpClients.size();
     }
 
-    private static boolean taskIsNotCaughtUpOnClientAndOtherCaughtUpClientsExist(final TaskId task,
-                                                                                 final UUID client,
-                                                                                 final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
-        return !taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
+    private static boolean taskIsNotCaughtUpOnClientAndOtherMoreCaughtUpClientsExist(final TaskId task,
+                                                                                     final UUID client,
+                                                                                     final Map<UUID, ClientState> clientStates,
+                                                                                     final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+                                                                                     final Map<TaskId, List<UUID>> tasksToClientByLag) {

Review comment:
       Would it be possible to use a `SortedSet` instead of a `List` here? Would make it clearer that client IDs should only appear once in this list. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r807858181



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            UUID sourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
                 c -> clientStates.get(c).hasStandbyTask(movement.task)
             );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
+
+            if (sourceClient == null) {
+                sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+            }
+
+            if (sourceClient == null) {
+                sourceClient = requireNonNull(
+                        mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
+                        "Tried to move task to more caught-up client but none exist"
                 );
+            }
 
+            if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
+                // there's not a standby available to take over the task, so we'll schedule a warmup instead

Review comment:
       This part of the code is a bit hard to read. After it is clear that there is a client with a standby after line 117 (`if sourceClient != null`), you could execute the content of the else branch on line 140. If there is no such client, then you can check for the caught up and most caught up client. Something like:
   ```
   if (sourceClient != null) {
       swapStandbyAndActive(...)
   }  else {
       sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
       if (sourceClient == null) {
           sourceClient = requireNonNull(
               mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
               "Tried to move task to more caught-up client but none exist"
           );
       }
       moveActiveAndTryToWarmUp(...)
   }
   caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -56,25 +54,34 @@ private int numCaughtUpClients() {
         return caughtUpClients.size();
     }
 
-    private static boolean taskIsNotCaughtUpOnClientAndOtherCaughtUpClientsExist(final TaskId task,
-                                                                                 final UUID client,
-                                                                                 final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
-        return !taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
+    private static boolean taskIsNotCaughtUpOnClientAndOtherMoreCaughtUpClientsExist(final TaskId task,
+                                                                                     final UUID client,
+                                                                                     final Map<UUID, ClientState> clientStates,
+                                                                                     final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+                                                                                     final Map<TaskId, List<UUID>> tasksToClientByLag) {
+        final List<UUID> taskClients = requireNonNull(tasksToClientByLag.get(task), "uninitialized map");

Review comment:
       ```suggestion
           final List<UUID> taskClients = requireNonNull(tasksToClientByLag.get(task), "uninitialized list");
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r808907120



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            UUID sourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
                 c -> clientStates.get(c).hasStandbyTask(movement.task)
             );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
+
+            if (sourceClient == null) {
+                sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+            }
+
+            if (sourceClient == null) {
+                sourceClient = requireNonNull(
+                        mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
+                        "Tried to move task to more caught-up client but none exist"
                 );
+            }
 
+            if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
+                // there's not a standby available to take over the task, so we'll schedule a warmup instead

Review comment:
       Ah, I totally missed case 3. 
   
   What about the following:
   ```
   if (sourceClient != null) {
       swapStandbyAndActive(...);
   }  else {
       sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
       if (sourceClient != null) {
           moveActiveAndTryToWarmUp(...);
       } else {
           sourceClient = mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination);
           if (sourceClient != null) {
               if (clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
                   swapStandbyAndActive(...);
               } else {
                   moveActiveAndTryToWarmUp(...);
               }
           } else {
               throw new IllegalStateException("Tried to move task to more caught-up client but none exist");
           }
       }
   }
   caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
   ```
   
   I think it makes the cases more explicit in the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11760: Kafka 13600 Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1040111496


   @vvcephei


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1040158233


   Thank you for the PR @tim-patterson ! I going to have a look at it tomorrow!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r810711885



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+            // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+            // caught up client.
+            UUID sourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
                 c -> clientStates.get(c).hasStandbyTask(movement.task)
             );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
+
+            if (sourceClient == null) {
+                sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+            }
+
+            if (sourceClient == null) {
+                sourceClient = requireNonNull(
+                        mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
+                        "Tried to move task to more caught-up client but none exist"
                 );
+            }
 
+            if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
+                // there's not a standby available to take over the task, so we'll schedule a warmup instead

Review comment:
       I've done it with the nested if/else's but stopped reusing the same sourceClient var to try make it a little clearer.
   Let me know what you think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1049175838


   I wonder if this is the fix https://issues.apache.org/jira/browse/KAFKA-13673
   (Not sure what tests in particular it's trying to fix)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna edited a comment on pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna edited a comment on pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#issuecomment-1048659159


   The most recent [system test run on trunk](http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2022-02-09--001.system-test-kafka-trunk--1644413755--confluentinc--master--753251b0f6/report.html) I could find on Jenkins was one on February 9th. This run does not contain the system test failures [I mentioned above](http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-02-21--001.system-test-kafka-branch-builder--1645479435--tim-patterson--KAFKA-13600-2--7b1e8797de/report.html).
   
   @tim-patterson Can you consistently reproduce the failures locally on trunk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

Posted by GitBox <gi...@apache.org>.
cadonna merged pull request #11760:
URL: https://github.com/apache/kafka/pull/11760


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org