You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/07/10 23:52:19 UTC

[kafka] branch 2.6 updated: KAFKA-10263: Do not assign standby for revoking stateless tasks (#9005)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new eee2ec5  KAFKA-10263: Do not assign standby for revoking stateless tasks (#9005)
eee2ec5 is described below

commit eee2ec5d8e850d7f2b1ee7e061784cd4439adef4
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri Jul 10 16:51:00 2020 -0700

    KAFKA-10263: Do not assign standby for revoking stateless tasks (#9005)
    
    Also piggy-back a small fix to use TreeMap other than HashMap to preserve iteration ordering.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <vv...@apache.org>
---
 .../internals/StreamsPartitionAssignor.java        | 61 ++++++++++++++--------
 .../internals/StreamsPartitionAssignorTest.java    |  9 +++-
 2 files changed, 45 insertions(+), 25 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 9352a3b..304ec30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -65,6 +65,7 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -370,8 +371,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         final Map<TaskId, Set<TopicPartition>> partitionsForTask =
             partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
 
+        final Set<TaskId> statefulTasks = new HashSet<>();
+
         final boolean probingRebalanceNeeded =
-            assignTasksToClients(allSourceTopics, partitionsForTask, topicGroups, clientMetadataMap, fullMetadata);
+            assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
 
         // ---------------- Step Three ---------------- //
 
@@ -389,6 +392,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         // compute the assignment of tasks to threads within each client and build the final group assignment
 
         final Map<String, Assignment> assignment = computeNewAssignment(
+                statefulTasks,
                 clientMetadataMap,
                 partitionsForTask,
                 partitionsByHost,
@@ -452,7 +456,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         final Map<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<>();
         for (final TopicsInfo topicsInfo : topicGroups.values()) {
             for (final String topic : topicsInfo.sourceTopics) {
-                if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
+                if (!topicsInfo.repartitionSourceTopics.containsKey(topic) &&
                         !metadata.topics().contains(topic)) {
                     log.error("Source topic {} is missing/unknown during rebalance, please make sure all source topics " +
                                   "have been pre-created before starting the Streams application. Returning error {}",
@@ -685,14 +689,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
     }
 
     /**
-     * Assigns a set of tasks to each client (Streams instance) using the configured task assignor
+     * Assigns a set of tasks to each client (Streams instance) using the configured task assignor, and also
+     * populate the stateful tasks that have been assigned to the clients
      * @return true if a probing rebalance should be triggered
      */
-    private boolean assignTasksToClients(final Set<String> allSourceTopics,
-                                         final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+    private boolean assignTasksToClients(final Cluster fullMetadata,
+                                         final Set<String> allSourceTopics,
                                          final Map<Integer, TopicsInfo> topicGroups,
                                          final Map<UUID, ClientMetadata> clientMetadataMap,
-                                         final Cluster fullMetadata) {
+                                         final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                         final Set<TaskId> statefulTasks) {
+        if (!statefulTasks.isEmpty())
+            throw new IllegalArgumentException("The stateful tasks should not be populated before assigning tasks to clients");
+
         final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
         final Map<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
         populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata);
@@ -713,7 +722,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             );
 
         final Set<TaskId> allTasks = partitionsForTask.keySet();
-        final Set<TaskId> statefulTasks = changelogsByStatefulTask.keySet();
+        statefulTasks.addAll(changelogsByStatefulTask.keySet());
 
         log.debug("Assigning tasks {} to clients {} with number of replicas {}",
             allTasks, clientStates, numStandbyReplicas());
@@ -898,7 +907,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
      *
      * @return the final assignment for each StreamThread consumer
      */
-    private Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
+    private Map<String, Assignment> computeNewAssignment(final Set<TaskId> statefulTasks,
+                                                         final Map<UUID, ClientMetadata> clientsMetadata,
                                                          final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                                          final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
                                                          final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
@@ -935,6 +945,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             final boolean encodeNextProbingRebalanceTime = shouldTriggerProbingRebalance && clientId.equals(taskManager.processId());
 
             final boolean tasksRevoked = addClientAssignments(
+                statefulTasks,
                 assignment,
                 clientMetadata,
                 partitionsForTask,
@@ -969,7 +980,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
      * Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map
      * @return true if a followup rebalance will be required due to revoked tasks
      */
-    private boolean addClientAssignments(final Map<String, Assignment> assignment,
+    private boolean addClientAssignments(final Set<TaskId> statefulTasks,
+                                         final Map<String, Assignment> assignment,
                                          final ClientMetadata clientMetadata,
                                          final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                          final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
@@ -1007,6 +1019,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                     consumer,
                     standbyTaskAssignments.get(consumer),
                     activeTasksRemovedPendingRevokation,
+                    statefulTasks,
                     partitionsForTask,
                     clientMetadata.state
                 );
@@ -1107,25 +1120,26 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
      * @return map from task id to its assigned partitions for all standby tasks
      */
     private Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final String consumer,
-                                                                 final Iterable<TaskId> standbys,
-                                                                 final Iterable<TaskId> tasksRevoked,
+                                                                 final Iterable<TaskId> standbyTasks,
+                                                                 final Iterable<TaskId> revokedTasks,
+                                                                 final Set<TaskId> allStatefulTasks,
                                                                  final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                                                  final ClientState clientState) {
         final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<>();
-        for (final TaskId task : standbys) {
+        for (final TaskId task : standbyTasks) {
             standbyTaskMap.put(task, partitionsForTask.get(task));
         }
-        for (final TaskId task : tasksRevoked) {
-            log.info(
-                "Adding removed active task {} as a standby for {} until it is safely revoked in followup rebalance",
-                task,
-                consumer
-            );
-            standbyTaskMap.put(task, partitionsForTask.get(task));
+        for (final TaskId task : revokedTasks) {
+            if (allStatefulTasks.contains(task)) {
+                log.info("Adding removed stateful active task {} as a standby for {} before it is safely revoked in followup rebalance",
+                        task, consumer
+                );
+                standbyTaskMap.put(task, partitionsForTask.get(task));
 
-            // This has no effect on the assignment, as we'll never consult the ClientState again, but
-            // it does perform a useful assertion that the it's legal to assign this task as a standby to this instance
-            clientState.assignStandby(task);
+                // This has no effect on the assignment, as we'll never consult the ClientState again, but
+                // it does perform a useful assertion that the it's legal to assign this task as a standby to this instance
+                clientState.assignStandby(task);
+            }
         }
         return standbyTaskMap;
     }
@@ -1154,7 +1168,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
-        final Map<TaskId, String> unassignedTaskToPreviousOwner = new HashMap<>();
+        // using tree-map to make sure the iteration ordering over keys are preserved
+        final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
         if (!unassignedStatefulTasks.isEmpty()) {
             // First assign stateful tasks to previous owner, up to the min expected tasks/thread
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index b3d4169..6d68498 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -1382,12 +1382,13 @@ public class StreamsPartitionAssignorTest {
         builder.addSource(null, "source1", null, null, null, "topic1");
 
         final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+        final List<TopicPartition> allPartitions = asList(t1p0, t1p1, t1p2);
 
         subscriptions.put(CONSUMER_1,
                           new Subscription(
                               Collections.singletonList("topic1"),
                               getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(),
-                              asList(t1p0, t1p1, t1p2))
+                              allPartitions)
         );
         subscriptions.put(CONSUMER_2,
                           new Subscription(
@@ -1402,9 +1403,13 @@ public class StreamsPartitionAssignorTest {
         final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // Verify at least one partition was revoked
-        assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks));
+        assertThat(assignment.get(CONSUMER_1).partitions(), not(allPartitions));
         assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList()));
 
+        // Verify that stateless revoked tasks would not be assigned as standbys
+        assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).activeTasks(), equalTo(emptyList()));
+        assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).standbyTasks(), equalTo(emptyMap()));
+
         partitionAssignor.onAssignment(assignment.get(CONSUMER_2), null);
 
         assertThat(nextScheduledRebalanceMs.get(), is(0L));