You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/16 20:17:44 UTC
[GitHub] [kafka] tim-patterson commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist
tim-patterson commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r808418617
##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC
final int movementsNeeded = taskMovements.size();
for (final TaskMovement movement : taskMovements) {
- final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+ // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
+ // caught up client.
+ UUID sourceClient = caughtUpClientsByTaskLoad.poll(
movement.task,
c -> clientStates.get(c).hasStandbyTask(movement.task)
);
- if (standbySourceClient == null) {
- // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead
- final UUID sourceClient = requireNonNull(
- caughtUpClientsByTaskLoad.poll(movement.task),
- "Tried to move task to caught-up client but none exist"
+
+ if (sourceClient == null) {
+ sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+ }
+
+ if (sourceClient == null) {
+ sourceClient = requireNonNull(
+ mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
+ "Tried to move task to more caught-up client but none exist"
);
+ }
+ if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
+ // there's not a standby available to take over the task, so we'll schedule a warmup instead
Review comment:
I sort of originally did something like that , but a unit test caused a runtime assertion.
Basically there's ~5 cases.
1. Caught up client with standby
2. Caught up client without standby
3. Not Caught up client with standby
4. Not Caught up client without standby
5. No client found (runtime assertion).
So the `sourceClient ` from `mostCaughtUpEligibleClient(...)` may actually need to call `swapStandbyAndActive(...)` depending on if it's actually got a standby assigned to it.
I guess can do something like
```
if (sourceClient != null) {
swapStandbyAndActive(...)
} else {
sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
if (sourceClient != null) {
moveActiveAndTryToWarmUp(...)
} else {
sourceClient = requireNonNull(
mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
"Tried to move task to more caught-up client but none exist"
);
if (clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
swapStandbyAndActive(...)
} else {
moveActiveAndTryToWarmUp(...)
}
}
}
caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org