You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/03 03:18:48 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r482678606



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
             final Long endOffsetSum = taskEntry.getValue();
             final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-            if (endOffsetSum < offsetSum) {
+            if (offsetSum == Task.LATEST_OFFSET) {
+                taskLagTotals.put(task, Task.LATEST_OFFSET);
+            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+            } else if (endOffsetSum < offsetSum) {
                 LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" +
                              offsetSum + " on member " + uuid + ". This probably means the task is corrupted," +
                              " which in turn indicates that it will need to restore from scratch if it gets assigned." +
                              " The assignor will de-prioritize returning this task to this member in the hopes that" +
                              " some other member may be able to re-use its state.");
                 taskLagTotals.put(task, endOffsetSum);

Review comment:
       Noticed that we were logging this incorrectly if either of the negative sentinels came up




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org