You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/02 05:24:05 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8775: KAFKA-10079: improve thread-level stickiness

ableegoldman opened a new pull request #8775:
URL: https://github.com/apache/kafka/pull/8775


   Uses a similar (but slightly different) algorithm as in [KAFKA-9987](https://issues.apache.org/jira/browse/KAFKA-9987) to produce a maximally sticky -- and perfectly balanced -- assignment of tasks to threads within a single client. This is important for in-memory stores which get wiped out when transferred between threads.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-637849595






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r434018826



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -242,8 +250,9 @@ public void addOwnedPartitions(final Collection<TopicPartition> ownedPartitions,
         }
     }
 
-    public void addPreviousTasksAndOffsetSums(final Map<TaskId, Long> taskOffsetSums) {
+    public void addPreviousTasksAndOffsetSums(final String consumerId, final Map<TaskId, Long> taskOffsetSums) {
         this.taskOffsetSums.putAll(taskOffsetSums);
+        consumerToPreviousTaskIds.put(consumerId, taskOffsetSums.keySet());

Review comment:
       Definitely. I meant to write tests but then I took Luna for a walk and forgot 😄 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8775:
URL: https://github.com/apache/kafka/pull/8775


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r434892857



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
##########
@@ -300,20 +302,71 @@ public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapac
     @Test
     public void shouldAddTasksWithLatestOffsetToPrevActiveTasks() {
         final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET);
-        client.addPreviousTasksAndOffsetSums(taskOffsetSums);
+        client.addPreviousTasksAndOffsetSums("c1", taskOffsetSums);
         client.initializePrevTasks(Collections.emptyMap());
         assertThat(client.prevActiveTasks(), equalTo(Collections.singleton(TASK_0_1)));
         assertThat(client.previousAssignedTasks(), equalTo(Collections.singleton(TASK_0_1)));
         assertTrue(client.prevStandbyTasks().isEmpty());
     }
 
+    @Test
+    public void shouldReturnPreviousStatefulTasksForConsumer() {
+        client.addPreviousTasksAndOffsetSums("c1", Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET));
+        client.addPreviousTasksAndOffsetSums("c2", Collections.singletonMap(TASK_0_2, 0L));
+        client.addPreviousTasksAndOffsetSums("c3", Collections.emptyMap());
+
+        client.initializePrevTasks(Collections.emptyMap());
+        client.computeTaskLags(
+            UUID_1,
+            mkMap(
+                mkEntry(TASK_0_1, 1_000L),
+                mkEntry(TASK_0_2, 1_000L)
+            )
+        );
+
+        assertThat(client.previousTasksForConsumer("c1"), equalTo(mkSortedSet(TASK_0_1)));
+        assertThat(client.previousTasksForConsumer("c2"), equalTo(mkSortedSet(TASK_0_2)));
+        assertTrue(client.previousTasksForConsumer("c3").isEmpty());
+    }
+
+    @Test
+    public void shouldReturnPreviousStatefulTasksForConsumerWhenLagIsNotComputed() {
+        client.addPreviousTasksAndOffsetSums("c1", Collections.singletonMap(TASK_0_1, 1000L));
+        client.initializePrevTasks(Collections.emptyMap());
+
+        assertThat(client.previousTasksForConsumer("c1"), equalTo(mkSortedSet(TASK_0_1)));
+    }
+
+    @Test
+    public void shouldReturnPreviousStatefulTasksForConsumerInIncreasingLagOrder() {

Review comment:
       I missed the extra sort on my last review. It really seems like too much fanciness for the ClientState to sort the tasks in lag order. Would it be too messy to move the sort aspect out to the balancing code that needs it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-637849782


   Test this please 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-639672280


   200 runs and I can't reproduce either. But it looks like both were previously flaky, and seem unrelated to this PR. Can we kick off tests again?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-642075119


   Cherry picked to 2.6


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-641638658


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r433923120



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -938,57 +930,9 @@ private void populatePartitionsByHostMaps(final Map<HostInfo, Set<TopicPartition
         return assignment;
     }
 
-    /**
-     * Computes the assignment of tasks to threads within each client and assembles the final assignment to send out,
-     * in the special case of version probing where some members are on different versions and have sent different
-     * subscriptions.
-     *
-     * @return the final assignment for each StreamThread consumer
-     */
-    private Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
-                                                             final Map<TaskId, Set<TopicPartition>> partitionsForTask,
-                                                             final Map<HostInfo, Set<TopicPartition>> partitionsByHost,
-                                                             final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
-                                                             final Set<TopicPartition> allOwnedPartitions,
-                                                             final int minUserMetadataVersion,
-                                                             final int minSupportedMetadataVersion) {
-        final Map<String, Assignment> assignment = new HashMap<>();
-
-        // Since we know another rebalance will be triggered anyway, just try and generate a balanced assignment
-        // (without violating cooperative protocol) now so that on the second rebalance we can just give tasks
-        // back to their previous owners
-        // within the client, distribute tasks to its owned consumers
-        for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
-            final ClientState state = clientMetadata.state;
-
-            final Map<String, List<TaskId>> interleavedActive =
-                interleaveConsumerTasksByGroupId(state.activeTasks(), clientMetadata.consumers);
-            final Map<String, List<TaskId>> interleavedStandby =
-                interleaveConsumerTasksByGroupId(state.standbyTasks(), clientMetadata.consumers);
-
-            addClientAssignments(
-                assignment,
-                clientMetadata,
-                partitionsForTask,
-                partitionsByHost,
-                standbyPartitionsByHost,
-                allOwnedPartitions,
-                interleavedActive,
-                interleavedStandby,
-                minUserMetadataVersion,
-                minSupportedMetadataVersion,
-                true,
-                false);
-        }
-
-        log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled due to version probing.");
-
-        return assignment;
-    }
-
     /**
      * Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map
-     * @return true if this client has been told to schedule a followup rebalance
+     * @return true if a followup rebalance will be required due to revoekd tasks

Review comment:
       ```suggestion
        * @return true if a followup rebalance will be required due to revoked tasks
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -242,8 +250,9 @@ public void addOwnedPartitions(final Collection<TopicPartition> ownedPartitions,
         }
     }
 
-    public void addPreviousTasksAndOffsetSums(final Map<TaskId, Long> taskOffsetSums) {
+    public void addPreviousTasksAndOffsetSums(final String consumerId, final Map<TaskId, Long> taskOffsetSums) {
         this.taskOffsetSums.putAll(taskOffsetSums);
+        consumerToPreviousTaskIds.put(consumerId, taskOffsetSums.keySet());

Review comment:
       We have several new methods, and also this new book-kept collection (`consumerToPreviousTaskIds`), but no new tests for them in ClientStateTest. Can you add the missing coverage?
   
   The new methods are more a matter of principle; I'm really concerned that we should have good coverage on the bookkeeping aspect of `consumerToPreviousTaskIds` because I fear future regressions when we have to maintain two data structures in a consistent fashion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-641638502


   Test this please 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-641638879


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-637849883


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-637850232


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r435433337



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -302,14 +300,19 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
      * @return end offset sum - offset sum
      *          Task.LATEST_OFFSET if this was previously an active running task on this client
      */
-    long lagFor(final TaskId task) {
-        final Long totalLag = taskLagTotals.get(task);
+    public long lagFor(final TaskId task) {
+        final Long totalLag;
+        if (taskLagTotals.isEmpty()) {
+            // If we couldn't compute the task lags due to failure to fetch offsets, just return a flat constant
+            totalLag = 0L;

Review comment:
       Is this the right constant to represent "we don't know the lag"? Or did I mistake how this is going to be used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-639068238


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r435374125



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
##########
@@ -300,20 +302,71 @@ public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapac
     @Test
     public void shouldAddTasksWithLatestOffsetToPrevActiveTasks() {
         final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET);
-        client.addPreviousTasksAndOffsetSums(taskOffsetSums);
+        client.addPreviousTasksAndOffsetSums("c1", taskOffsetSums);
         client.initializePrevTasks(Collections.emptyMap());
         assertThat(client.prevActiveTasks(), equalTo(Collections.singleton(TASK_0_1)));
         assertThat(client.previousAssignedTasks(), equalTo(Collections.singleton(TASK_0_1)));
         assertTrue(client.prevStandbyTasks().isEmpty());
     }
 
+    @Test
+    public void shouldReturnPreviousStatefulTasksForConsumer() {
+        client.addPreviousTasksAndOffsetSums("c1", Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET));
+        client.addPreviousTasksAndOffsetSums("c2", Collections.singletonMap(TASK_0_2, 0L));
+        client.addPreviousTasksAndOffsetSums("c3", Collections.emptyMap());
+
+        client.initializePrevTasks(Collections.emptyMap());
+        client.computeTaskLags(
+            UUID_1,
+            mkMap(
+                mkEntry(TASK_0_1, 1_000L),
+                mkEntry(TASK_0_2, 1_000L)
+            )
+        );
+
+        assertThat(client.previousTasksForConsumer("c1"), equalTo(mkSortedSet(TASK_0_1)));
+        assertThat(client.previousTasksForConsumer("c2"), equalTo(mkSortedSet(TASK_0_2)));
+        assertTrue(client.previousTasksForConsumer("c3").isEmpty());
+    }
+
+    @Test
+    public void shouldReturnPreviousStatefulTasksForConsumerWhenLagIsNotComputed() {
+        client.addPreviousTasksAndOffsetSums("c1", Collections.singletonMap(TASK_0_1, 1000L));
+        client.initializePrevTasks(Collections.emptyMap());
+
+        assertThat(client.previousTasksForConsumer("c1"), equalTo(mkSortedSet(TASK_0_1)));
+    }
+
+    @Test
+    public void shouldReturnPreviousStatefulTasksForConsumerInIncreasingLagOrder() {

Review comment:
       You didn't miss it, I just snuck it in there after your review :P
   
   Sorry, should have called out that I made some more changes. I think that was the only significant logical change though. I'll try pulling the sort out into the assignment code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-638494568






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-637578416


   Ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-639128329






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-641639241


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-641711592


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-639184354


   Java8 failed with 
   `KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration`
   
   Java14 failed with `KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled`
   
   I've seen both of these be flaky already (and frankly am a bit concerned about them...) but I'll see if I can reproduce this locally in case this PR is somehow making them worse


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-641638765


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r435585609



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -302,14 +300,19 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
      * @return end offset sum - offset sum
      *          Task.LATEST_OFFSET if this was previously an active running task on this client
      */
-    long lagFor(final TaskId task) {
-        final Long totalLag = taskLagTotals.get(task);
+    public long lagFor(final TaskId task) {
+        final Long totalLag;
+        if (taskLagTotals.isEmpty()) {
+            // If we couldn't compute the task lags due to failure to fetch offsets, just return a flat constant
+            totalLag = 0L;

Review comment:
       The value itself doesn't matter, just that it's constant across all tasks.
   
   But I'm guessing you meant, why not use the existing `UNKNOWN_OFFSET_SUM` sentinel, in which case the answer is probably just that I forgot about it. Anyway I did a slight additional refactoring beyond this, just fyi: instead of skipping the lag computation when we fail to fetch offsets, we now always initialize the lags and just pass in the `UNKNOWN_OFFSET_SUM` for all stateful tasks when the offset fetch fails.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-637906677


   Tests failed due to the broken consumer StickyAssignor test that will be fixed via https://github.com/apache/kafka/pull/8786


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org