You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/02 16:33:13 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

vvcephei commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r433923120



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -938,57 +930,9 @@ private void populatePartitionsByHostMaps(final Map<HostInfo, Set<TopicPartition
         return assignment;
     }
 
-    /**
-     * Computes the assignment of tasks to threads within each client and assembles the final assignment to send out,
-     * in the special case of version probing where some members are on different versions and have sent different
-     * subscriptions.
-     *
-     * @return the final assignment for each StreamThread consumer
-     */
-    private Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
-                                                             final Map<TaskId, Set<TopicPartition>> partitionsForTask,
-                                                             final Map<HostInfo, Set<TopicPartition>> partitionsByHost,
-                                                             final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
-                                                             final Set<TopicPartition> allOwnedPartitions,
-                                                             final int minUserMetadataVersion,
-                                                             final int minSupportedMetadataVersion) {
-        final Map<String, Assignment> assignment = new HashMap<>();
-
-        // Since we know another rebalance will be triggered anyway, just try and generate a balanced assignment
-        // (without violating cooperative protocol) now so that on the second rebalance we can just give tasks
-        // back to their previous owners
-        // within the client, distribute tasks to its owned consumers
-        for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
-            final ClientState state = clientMetadata.state;
-
-            final Map<String, List<TaskId>> interleavedActive =
-                interleaveConsumerTasksByGroupId(state.activeTasks(), clientMetadata.consumers);
-            final Map<String, List<TaskId>> interleavedStandby =
-                interleaveConsumerTasksByGroupId(state.standbyTasks(), clientMetadata.consumers);
-
-            addClientAssignments(
-                assignment,
-                clientMetadata,
-                partitionsForTask,
-                partitionsByHost,
-                standbyPartitionsByHost,
-                allOwnedPartitions,
-                interleavedActive,
-                interleavedStandby,
-                minUserMetadataVersion,
-                minSupportedMetadataVersion,
-                true,
-                false);
-        }
-
-        log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled due to version probing.");
-
-        return assignment;
-    }
-
     /**
      * Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map
-     * @return true if this client has been told to schedule a followup rebalance
+     * @return true if a followup rebalance will be required due to revoekd tasks

Review comment:
       ```suggestion
        * @return true if a followup rebalance will be required due to revoked tasks
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -242,8 +250,9 @@ public void addOwnedPartitions(final Collection<TopicPartition> ownedPartitions,
         }
     }
 
-    public void addPreviousTasksAndOffsetSums(final Map<TaskId, Long> taskOffsetSums) {
+    public void addPreviousTasksAndOffsetSums(final String consumerId, final Map<TaskId, Long> taskOffsetSums) {
         this.taskOffsetSums.putAll(taskOffsetSums);
+        consumerToPreviousTaskIds.put(consumerId, taskOffsetSums.keySet());

Review comment:
       We have several new methods, and also this new book-kept collection (`consumerToPreviousTaskIds`), but no new tests for them in ClientStateTest. Can you add the missing coverage?
   
   The new methods are more a matter of principle; I'm really concerned that we should have good coverage on the bookkeeping aspect of `consumerToPreviousTaskIds` because I fear future regressions when we have to maintain two data structures in a consistent fashion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org