You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/07/10 23:52:19 UTC
[kafka] branch 2.6 updated: KAFKA-10263: Do not assign standby for
revoking stateless tasks (#9005)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new eee2ec5 KAFKA-10263: Do not assign standby for revoking stateless tasks (#9005)
eee2ec5 is described below
commit eee2ec5d8e850d7f2b1ee7e061784cd4439adef4
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri Jul 10 16:51:00 2020 -0700
KAFKA-10263: Do not assign standby for revoking stateless tasks (#9005)
Also piggy-back a small fix to use TreeMap other than HashMap to preserve iteration ordering.
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <vv...@apache.org>
---
.../internals/StreamsPartitionAssignor.java | 61 ++++++++++++++--------
.../internals/StreamsPartitionAssignorTest.java | 9 +++-
2 files changed, 45 insertions(+), 25 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 9352a3b..304ec30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -65,6 +65,7 @@ import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -370,8 +371,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final Map<TaskId, Set<TopicPartition>> partitionsForTask =
partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
+ final Set<TaskId> statefulTasks = new HashSet<>();
+
final boolean probingRebalanceNeeded =
- assignTasksToClients(allSourceTopics, partitionsForTask, topicGroups, clientMetadataMap, fullMetadata);
+ assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
// ---------------- Step Three ---------------- //
@@ -389,6 +392,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// compute the assignment of tasks to threads within each client and build the final group assignment
final Map<String, Assignment> assignment = computeNewAssignment(
+ statefulTasks,
clientMetadataMap,
partitionsForTask,
partitionsByHost,
@@ -452,7 +456,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final Map<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<>();
for (final TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topic : topicsInfo.sourceTopics) {
- if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
+ if (!topicsInfo.repartitionSourceTopics.containsKey(topic) &&
!metadata.topics().contains(topic)) {
log.error("Source topic {} is missing/unknown during rebalance, please make sure all source topics " +
"have been pre-created before starting the Streams application. Returning error {}",
@@ -685,14 +689,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
/**
- * Assigns a set of tasks to each client (Streams instance) using the configured task assignor
+ * Assigns a set of tasks to each client (Streams instance) using the configured task assignor, and also
+ * populate the stateful tasks that have been assigned to the clients
* @return true if a probing rebalance should be triggered
*/
- private boolean assignTasksToClients(final Set<String> allSourceTopics,
- final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+ private boolean assignTasksToClients(final Cluster fullMetadata,
+ final Set<String> allSourceTopics,
final Map<Integer, TopicsInfo> topicGroups,
final Map<UUID, ClientMetadata> clientMetadataMap,
- final Cluster fullMetadata) {
+ final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+ final Set<TaskId> statefulTasks) {
+ if (!statefulTasks.isEmpty())
+ throw new IllegalArgumentException("The stateful tasks should not be populated before assigning tasks to clients");
+
final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
final Map<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata);
@@ -713,7 +722,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
);
final Set<TaskId> allTasks = partitionsForTask.keySet();
- final Set<TaskId> statefulTasks = changelogsByStatefulTask.keySet();
+ statefulTasks.addAll(changelogsByStatefulTask.keySet());
log.debug("Assigning tasks {} to clients {} with number of replicas {}",
allTasks, clientStates, numStandbyReplicas());
@@ -898,7 +907,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
*
* @return the final assignment for each StreamThread consumer
*/
- private Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
+ private Map<String, Assignment> computeNewAssignment(final Set<TaskId> statefulTasks,
+ final Map<UUID, ClientMetadata> clientsMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
@@ -935,6 +945,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final boolean encodeNextProbingRebalanceTime = shouldTriggerProbingRebalance && clientId.equals(taskManager.processId());
final boolean tasksRevoked = addClientAssignments(
+ statefulTasks,
assignment,
clientMetadata,
partitionsForTask,
@@ -969,7 +980,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
* Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map
* @return true if a followup rebalance will be required due to revoked tasks
*/
- private boolean addClientAssignments(final Map<String, Assignment> assignment,
+ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
+ final Map<String, Assignment> assignment,
final ClientMetadata clientMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
@@ -1007,6 +1019,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
consumer,
standbyTaskAssignments.get(consumer),
activeTasksRemovedPendingRevokation,
+ statefulTasks,
partitionsForTask,
clientMetadata.state
);
@@ -1107,25 +1120,26 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
* @return map from task id to its assigned partitions for all standby tasks
*/
private Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final String consumer,
- final Iterable<TaskId> standbys,
- final Iterable<TaskId> tasksRevoked,
+ final Iterable<TaskId> standbyTasks,
+ final Iterable<TaskId> revokedTasks,
+ final Set<TaskId> allStatefulTasks,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final ClientState clientState) {
final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<>();
- for (final TaskId task : standbys) {
+ for (final TaskId task : standbyTasks) {
standbyTaskMap.put(task, partitionsForTask.get(task));
}
- for (final TaskId task : tasksRevoked) {
- log.info(
- "Adding removed active task {} as a standby for {} until it is safely revoked in followup rebalance",
- task,
- consumer
- );
- standbyTaskMap.put(task, partitionsForTask.get(task));
+ for (final TaskId task : revokedTasks) {
+ if (allStatefulTasks.contains(task)) {
+ log.info("Adding removed stateful active task {} as a standby for {} before it is safely revoked in followup rebalance",
+ task, consumer
+ );
+ standbyTaskMap.put(task, partitionsForTask.get(task));
- // This has no effect on the assignment, as we'll never consult the ClientState again, but
- // it does perform a useful assertion that the it's legal to assign this task as a standby to this instance
- clientState.assignStandby(task);
+ // This has no effect on the assignment, as we'll never consult the ClientState again, but
+ // it does perform a useful assertion that the it's legal to assign this task as a standby to this instance
+ clientState.assignStandby(task);
+ }
}
return standbyTaskMap;
}
@@ -1154,7 +1168,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final Queue<String> consumersToFill = new LinkedList<>();
// keep track of tasks that we have to skip during the first pass in case we can reassign them later
- final Map<TaskId, String> unassignedTaskToPreviousOwner = new HashMap<>();
+ // using tree-map to make sure the iteration ordering over keys are preserved
+ final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
if (!unassignedStatefulTasks.isEmpty()) {
// First assign stateful tasks to previous owner, up to the min expected tasks/thread
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index b3d4169..6d68498 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -1382,12 +1382,13 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source1", null, null, null, "topic1");
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+ final List<TopicPartition> allPartitions = asList(t1p0, t1p1, t1p2);
subscriptions.put(CONSUMER_1,
new Subscription(
Collections.singletonList("topic1"),
getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(),
- asList(t1p0, t1p1, t1p2))
+ allPartitions)
);
subscriptions.put(CONSUMER_2,
new Subscription(
@@ -1402,9 +1403,13 @@ public class StreamsPartitionAssignorTest {
final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// Verify at least one partition was revoked
- assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks));
+ assertThat(assignment.get(CONSUMER_1).partitions(), not(allPartitions));
assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList()));
+ // Verify that stateless revoked tasks would not be assigned as standbys
+ assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).activeTasks(), equalTo(emptyList()));
+ assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).standbyTasks(), equalTo(emptyMap()));
+
partitionAssignor.onAssignment(assignment.get(CONSUMER_2), null);
assertThat(nextScheduledRebalanceMs.get(), is(0L));