You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/05/08 18:46:19 UTC
[kafka] branch trunk updated: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder (#13638)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7634eee2627 KAFKA-14462; [11/N] Add CurrentAssignmentBuilder (#13638)
7634eee2627 is described below
commit 7634eee2627da39937e3112ffc58bd7cfedc98f2
Author: David Jacot <dj...@confluent.io>
AuthorDate: Mon May 8 20:46:07 2023 +0200
KAFKA-14462; [11/N] Add CurrentAssignmentBuilder (#13638)
This patch adds the `CurrentAssignmentBuilder` class which encapsulates the reconciliation engine of the consumer group protocol. Given the current state of a member and a desired or target assignment state, the state machine takes the necessary steps to converge the member to its desired state.
Reviewers: Ritika Reddy <rr...@confluent.io>, Calvin Liu <ca...@confluent.io>, Jeff Kim <je...@confluent.io>, Justine Olshan <jo...@confluent.io>
---
.../group/consumer/ConsumerGroupMember.java | 4 +
.../group/consumer/CurrentAssignmentBuilder.java | 445 ++++++++++++++++
.../consumer/CurrentAssignmentBuilderTest.java | 558 +++++++++++++++++++++
3 files changed, 1007 insertions(+)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
index b2e4fcb782e..c40bb7c937c 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
@@ -264,6 +264,10 @@ public class ConsumerGroupMember {
}
}
+ /**
+ * The various states that a member can be in. For their definition,
+ * refer to the documentation of {{@link CurrentAssignmentBuilder}}.
+ */
public enum MemberState {
REVOKING("revoking"),
ASSIGNING("assigning"),
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java
new file mode 100644
index 00000000000..6a255ae8e53
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the
+ * consumer group protocol. Given the current state of a member and a desired or target
+ * assignment state, the state machine takes the necessary steps to converge them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ * The current epoch of the member.
+ *
+ * - Next Epoch:
+ * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment.
+ * The member transitions to this epoch when it has revoked the partitions that it does not own
+ * or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ * The epoch of the member when the state was last updated.
+ *
+ * - Assigned Partitions:
+ * The set of partitions currently assigned to the member. This represents what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ * The set of partitions that the member should revoke before it can transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ * The set of partitions that the member will eventually receive. The partitions in this set are
+ * still owned by other members in the group.
+ *
+ * The state machine has three states:
+ * - REVOKING:
+ * This state means that the member must revoke partitions before it can transition to the next epoch
+ * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions
+ * are committed with the current epoch. The member transitions to the next state only when it has
+ * acknowledged the revocation.
+ *
+ * - ASSIGNING:
+ * This state means that the member waits on partitions which are still owned by other members in the
+ * group. It remains in this state until they are all freed up.
+ *
+ * - STABLE:
+ * This state means that the member has received all its assigned partitions.
+ *
+ * The reconciliation process is started or re-started whenever a new target assignment is installed;
+ * the epoch of the new target assignment is different from the next epoch of the member. In this transient
+ * state, the assigned partitions, the partitions pending revocation and the partitions pending assignment
+ * are updated. If the partitions pending revocation is not empty, the state machine transitions to
+ * REVOKING; if partitions pending assignment is not empty, it transitions to ASSIGNING; otherwise it
+ * transitions to STABLE.
+ */
+public class CurrentAssignmentBuilder {
+ /**
+ * The consumer group member which is reconciled.
+ */
+ private final ConsumerGroupMember member;
+
+ /**
+ * The target assignment epoch.
+ */
+ private int targetAssignmentEpoch;
+
+ /**
+ * The target assignment.
+ */
+ private Assignment targetAssignment;
+
+ /**
+ * A function which returns the current epoch of a topic-partition or -1 if the
+ * topic-partition is not assigned. The current epoch is the epoch of the current owner.
+ */
+ private BiFunction<Uuid, Integer, Integer> currentPartitionEpoch;
+
+ /**
+ * The partitions owned by the consumer. This is directly provided by the member in the
+ * ConsumerGroupHeartbeat request.
+ */
+ private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions;
+
+ /**
+ * Constructs the CurrentAssignmentBuilder based on the current state of the
+ * provided consumer group member.
+ *
+ * @param member The consumer group member that must be reconciled.
+ */
+ public CurrentAssignmentBuilder(ConsumerGroupMember member) {
+ this.member = Objects.requireNonNull(member);
+ }
+
+ /**
+ * Sets the target assignment epoch and the target assignment that the
+ * consumer group member must be reconciled to.
+ *
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment The target assignment.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withTargetAssignment(
+ int targetAssignmentEpoch,
+ Assignment targetAssignment
+ ) {
+ this.targetAssignmentEpoch = targetAssignmentEpoch;
+ this.targetAssignment = Objects.requireNonNull(targetAssignment);
+ return this;
+ }
+
+ /**
+ * Sets a BiFunction which allows to retrieve the current epoch of a
+ * partition. This is used by the state machine to determine if a
+ * partition is free or still used by another member.
+ *
+ * @param currentPartitionEpoch A BiFunction which gets the epoch of a
+ * topic id / partitions id pair.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withCurrentPartitionEpoch(
+ BiFunction<Uuid, Integer, Integer> currentPartitionEpoch
+ ) {
+ this.currentPartitionEpoch = Objects.requireNonNull(currentPartitionEpoch);
+ return this;
+ }
+
+ /**
+ * Sets the partitions currently owned by the member. This comes directly
+ * from the last ConsumerGroupHeartbeat request. This is used to determine
+ * if the member has revoked the necessary partitions.
+ *
+ * @param ownedTopicPartitions A list of topic-partitions.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withOwnedTopicPartitions(
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
+ ) {
+ this.ownedTopicPartitions = ownedTopicPartitions;
+ return this;
+ }
+
+ /**
+ * Builds the next state for the member or keep the current one if it
+ * is not possible to move forward with the current state.
+ *
+ * @return A new ConsumerGroupMember or the current one.
+ */
+ public ConsumerGroupMember build() {
+ // A new target assignment has been installed, we need to restart
+ // the reconciliation loop from the beginning.
+ if (targetAssignmentEpoch != member.nextMemberEpoch()) {
+ return transitionToNewTargetAssignmentState();
+ }
+
+ switch (member.state()) {
+ // Check if the partitions have been revoked by the member.
+ case REVOKING:
+ return maybeTransitionFromRevokingToAssigningOrStable();
+
+ // Check if pending partitions have been freed up.
+ case ASSIGNING:
+ return maybeTransitionFromAssigningToAssigningOrStable();
+
+ // Nothing to do.
+ case STABLE:
+ return member;
+ }
+
+ return member;
+ }
+
+ /**
+ * Transitions to NewTargetAssignment state. This is a transient state where
+ * we compute the assigned partitions, the partitions pending revocation,
+ * the partitions pending assignment, and transition to the next state.
+ *
+ * @return A new ConsumerGroupMember.
+ */
+ private ConsumerGroupMember transitionToNewTargetAssignmentState() {
+ Map<Uuid, Set<Integer>> newAssignedPartitions = new HashMap<>();
+ Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new HashMap<>();
+ Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = new HashMap<>();
+
+ // Compute the combined set of topics.
+ Set<Uuid> allTopicIds = new HashSet<>(targetAssignment.partitions().keySet());
+ allTopicIds.addAll(member.assignedPartitions().keySet());
+ allTopicIds.addAll(member.partitionsPendingRevocation().keySet());
+ allTopicIds.addAll(member.partitionsPendingAssignment().keySet());
+
+ for (Uuid topicId : allTopicIds) {
+ Set<Integer> target = targetAssignment.partitions()
+ .getOrDefault(topicId, Collections.emptySet());
+ Set<Integer> currentAssignedPartitions = member.assignedPartitions()
+ .getOrDefault(topicId, Collections.emptySet());
+ Set<Integer> currentRevokingPartitions = member.partitionsPendingRevocation()
+ .getOrDefault(topicId, Collections.emptySet());
+
+ // Assigned_1 = (Assigned_0 + Pending_Revocation_0) ∩ Target
+ // Assigned_0 + Pending_Revocation_0 is used here because the partitions
+ // being revoked are still owned until the revocation is acknowledged.
+ Set<Integer> assignedPartitions = new HashSet<>(currentAssignedPartitions);
+ assignedPartitions.addAll(currentRevokingPartitions);
+ assignedPartitions.retainAll(target);
+
+ // Pending_Revocation_1 = (Assigned_0 + Pending_Revocation_0) - Assigned_1
+ // Assigned_0 + Pending_Revocation_0 is used here because the partitions
+ // being revoked are still owned until the revocation is acknowledged.
+ Set<Integer> partitionsPendingRevocation = new HashSet<>(currentAssignedPartitions);
+ partitionsPendingRevocation.addAll(currentRevokingPartitions);
+ partitionsPendingRevocation.removeAll(assignedPartitions);
+
+ // Pending_Assignment_1 = Target - Assigned_1
+ Set<Integer> partitionsPendingAssignment = new HashSet<>(target);
+ partitionsPendingAssignment.removeAll(assignedPartitions);
+
+ if (!assignedPartitions.isEmpty()) {
+ newAssignedPartitions.put(topicId, assignedPartitions);
+ }
+
+ if (!partitionsPendingRevocation.isEmpty()) {
+ newPartitionsPendingRevocation.put(topicId, partitionsPendingRevocation);
+ }
+
+ if (!partitionsPendingAssignment.isEmpty()) {
+ newPartitionsPendingAssignment.put(topicId, partitionsPendingAssignment);
+ }
+ }
+
+ if (!newPartitionsPendingRevocation.isEmpty()) {
+ // If the partition pending revocation set is not empty, we transition the
+ // member to revoking and keep the current epoch. The transition to the new
+ // state is done when the member is updated.
+ return new ConsumerGroupMember.Builder(member)
+ .setAssignedPartitions(newAssignedPartitions)
+ .setPartitionsPendingRevocation(newPartitionsPendingRevocation)
+ .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
+ .setNextMemberEpoch(targetAssignmentEpoch)
+ .build();
+ } else {
+ if (!newPartitionsPendingAssignment.isEmpty()) {
+ // If the partitions pending assignment set is not empty, we check
+ // if some or all partitions are free to use. If they are, we move
+ // them to the partitions assigned set.
+ maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment);
+ }
+
+ // We transition to the target epoch. If the partitions pending assignment
+ // set is empty, the member transition to stable, otherwise to assigning.
+ // The transition to the new state is done when the member is updated.
+ return new ConsumerGroupMember.Builder(member)
+ .setAssignedPartitions(newAssignedPartitions)
+ .setPartitionsPendingRevocation(Collections.emptyMap())
+ .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
+ .setPreviousMemberEpoch(member.memberEpoch())
+ .setMemberEpoch(targetAssignmentEpoch)
+ .setNextMemberEpoch(targetAssignmentEpoch)
+ .build();
+ }
+ }
+
+ /**
+ * Tries to transition from Revoke to Assigning or Stable. This is only
+ * possible when the member acknowledges that it only owns the partition
+ * in the assigned partitions.
+ *
+ * @return A new ConsumerGroupMember with the new state or the current one
+ * if the member stays in the current state.
+ */
+ private ConsumerGroupMember maybeTransitionFromRevokingToAssigningOrStable() {
+ if (member.partitionsPendingRevocation().isEmpty() || matchesAssignedPartitions(ownedTopicPartitions)) {
+ Map<Uuid, Set<Integer>> newAssignedPartitions = deepCopy(member.assignedPartitions());
+ Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = deepCopy(member.partitionsPendingAssignment());
+
+ if (!newPartitionsPendingAssignment.isEmpty()) {
+ // If the partitions pending assignment set is not empty, we check
+ // if some or all partitions are free to use. If they are, we move
+ // them to the assigned set.
+ maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment);
+ }
+
+ // We transition to the target epoch. If the partitions pending assignment
+ // set is empty, the member transition to stable, otherwise to assigning.
+ // The transition to the new state is done when the member is updated.
+ return new ConsumerGroupMember.Builder(member)
+ .setAssignedPartitions(newAssignedPartitions)
+ .setPartitionsPendingRevocation(Collections.emptyMap())
+ .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
+ .setPreviousMemberEpoch(member.memberEpoch())
+ .setMemberEpoch(targetAssignmentEpoch)
+ .setNextMemberEpoch(targetAssignmentEpoch)
+ .build();
+ } else {
+ return member;
+ }
+ }
+
+ /**
+ * Tries to transition from Assigning to Assigning or Stable. This is only
+ * possible when one or more partitions in the partitions pending assignment
+ * set have been freed up by other members in the group.
+ *
+ * @return A new ConsumerGroupMember with the new state or the current one
+ * if the member stays in the current state.
+ */
+ private ConsumerGroupMember maybeTransitionFromAssigningToAssigningOrStable() {
+ Map<Uuid, Set<Integer>> newAssignedPartitions = deepCopy(member.assignedPartitions());
+ Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = deepCopy(member.partitionsPendingAssignment());
+
+ // If any partition can transition from assigning to assigned, we update
+ // the member. Otherwise, we return the current one. The transition to the
+ // new state is done when the member is updated.
+ if (maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment)) {
+ return new ConsumerGroupMember.Builder(member)
+ .setAssignedPartitions(newAssignedPartitions)
+ .setPartitionsPendingRevocation(Collections.emptyMap())
+ .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
+ .setPreviousMemberEpoch(member.memberEpoch())
+ .setMemberEpoch(targetAssignmentEpoch)
+ .setNextMemberEpoch(targetAssignmentEpoch)
+ .build();
+ } else {
+ return member;
+ }
+ }
+
+ /**
+ * Tries to move partitions from the partitions pending assignment set to
+ * the partitions assigned set if they are no longer owned.
+ *
+ * @param newAssignedPartitions The assigned partitions.
+ * @param newPartitionsPendingAssignment The partitions pending assignment.
+ * @return A boolean indicating if any partitions were moved.
+ */
+ private boolean maybeAssignPendingPartitions(
+ Map<Uuid, Set<Integer>> newAssignedPartitions,
+ Map<Uuid, Set<Integer>> newPartitionsPendingAssignment
+ ) {
+ boolean changed = false;
+
+ Iterator<Map.Entry<Uuid, Set<Integer>>> assigningSetIterator =
+ newPartitionsPendingAssignment.entrySet().iterator();
+
+ while (assigningSetIterator.hasNext()) {
+ Map.Entry<Uuid, Set<Integer>> pair = assigningSetIterator.next();
+ Uuid topicId = pair.getKey();
+ Set<Integer> assigning = pair.getValue();
+
+ Iterator<Integer> assigningIterator = assigning.iterator();
+ while (assigningIterator.hasNext()) {
+ Integer partitionId = assigningIterator.next();
+
+ // A partition can be assigned to this member iff it has been
+ // released by its previous owner. This is signaled by -1.
+ Integer partitionEpoch = currentPartitionEpoch.apply(topicId, partitionId);
+ if (partitionEpoch == -1) {
+ assigningIterator.remove();
+ put(newAssignedPartitions, topicId, partitionId);
+ changed = true;
+ }
+ }
+
+ if (assigning.isEmpty()) {
+ assigningSetIterator.remove();
+ }
+ }
+
+ return changed;
+ }
+
+ /**
+ * Checks whether the owned topic partitions passed by the member to the state
+ * machine via the ConsumerGroupHeartbeat request corresponds to the assigned
+ * partitions.
+ *
+ * @param ownedTopicPartitions The topic partitions owned by the remove client.
+ * @return A boolean indicating if the owned partitions matches the Assigned set.
+ */
+ private boolean matchesAssignedPartitions(
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
+ ) {
+ if (ownedTopicPartitions == null) return false;
+ if (ownedTopicPartitions.size() != member.assignedPartitions().size()) return false;
+
+ for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) {
+ Set<Integer> partitions = member.assignedPartitions().get(topicPartitions.topicId());
+ if (partitions == null) return false;
+ for (Integer partitionId : topicPartitions.partitions()) {
+ if (!partitions.contains(partitionId)) return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Makes a deep copy of an assignment map.
+ *
+ * @param map The Map to copy.
+ * @return The copy.
+ */
+ private Map<Uuid, Set<Integer>> deepCopy(Map<Uuid, Set<Integer>> map) {
+ Map<Uuid, Set<Integer>> copy = new HashMap<>();
+ map.forEach((topicId, partitions) -> copy.put(topicId, new HashSet<>(partitions)));
+ return copy;
+ }
+
+ /**
+ * Puts the given TopicId and Partitions to the given map.
+ */
+ private void put(
+ Map<Uuid, Set<Integer>> map,
+ Uuid topicId,
+ Integer partitionId
+ ) {
+ map.compute(topicId, (__, partitionsOrNull) -> {
+ if (partitionsOrNull == null) partitionsOrNull = new HashSet<>();
+ partitionsOrNull.add(partitionId);
+ return partitionsOrNull;
+ });
+ }
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java
new file mode 100644
index 00000000000..b67c68e6425
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+ @Test
+ public void testTransitionFromNewTargetToRevoke() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setNextMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(11, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(10, updatedMember.memberEpoch());
+ assertEquals(11, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)
+ ), updatedMember.assignedPartitions());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2),
+ mkTopicAssignment(topicId2, 4, 5)
+ ), updatedMember.partitionsPendingRevocation());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)
+ ), updatedMember.partitionsPendingAssignment());
+ }
+
+ @Test
+ public void testTransitionFromNewTargetToAssigning() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setNextMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+ mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(11, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(11, updatedMember.memberEpoch());
+ assertEquals(11, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)
+ ), updatedMember.assignedPartitions());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)
+ ), updatedMember.partitionsPendingAssignment());
+ }
+
+ @Test
+ public void testTransitionFromNewTargetToStable() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setNextMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(11, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(11, updatedMember.memberEpoch());
+ assertEquals(11, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)
+ ), updatedMember.assignedPartitions());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment());
+ }
+
+ private static Stream<Arguments> ownedTopicPartitionsArguments() {
+ return Stream.of(
+ // Field not set in the heartbeat request.
+ null,
+ // Owned partitions does not match the assigned partitions.
+ Collections.emptyList()
+ ).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("ownedTopicPartitionsArguments")
+ public void testTransitionFromRevokeToRevoke(
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
+ ) {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setNextMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2),
+ mkTopicAssignment(topicId2, 4, 5)))
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(11, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .withOwnedTopicPartitions(ownedTopicPartitions)
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(10, updatedMember.memberEpoch());
+ assertEquals(11, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)
+ ), updatedMember.assignedPartitions());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2),
+ mkTopicAssignment(topicId2, 4, 5)
+ ), updatedMember.partitionsPendingRevocation());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)
+ ), updatedMember.partitionsPendingAssignment());
+ }
+
+ @Test
+ public void testTransitionFromRevokeToAssigning() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setNextMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2),
+ mkTopicAssignment(topicId2, 4, 5)))
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(11, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+ .withOwnedTopicPartitions(requestFromAssignment(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6))))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(11, updatedMember.memberEpoch());
+ assertEquals(11, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)
+ ), updatedMember.assignedPartitions());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)
+ ), updatedMember.partitionsPendingAssignment());
+ }
+
+ @Test
+ public void testTransitionFromRevokeToStable() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setNextMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2),
+ mkTopicAssignment(topicId2, 4, 5)))
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(11, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .withOwnedTopicPartitions(requestFromAssignment(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6))))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(11, updatedMember.memberEpoch());
+ assertEquals(11, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ), updatedMember.assignedPartitions());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment());
+ }
+
+ @Test
+ public void testTransitionFromRevokeToStableWhenPartitionsPendingRevocationAreReassignedBeforeBeingRevoked() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setNextMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2),
+ mkTopicAssignment(topicId2, 4, 5)))
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state());
+
+ // A new target assignment is computed (epoch 12) before the partitions
+ // pending revocation are revoked by the member and those partitions
+ // have been reassigned to the member. In this case, the member
+ // can keep them a jump to epoch 12.
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(12, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(12, updatedMember.memberEpoch());
+ assertEquals(12, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)
+ ), updatedMember.assignedPartitions());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment());
+ }
+
+ @Test
+ public void testTransitionFromAssigningToAssigning() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(11)
+ .setNextMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)))
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(11, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> {
+ if (topicId.equals(topicId1))
+ return -1;
+ else
+ return 10;
+ })
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(11, updatedMember.memberEpoch());
+ assertEquals(11, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6)
+ ), updatedMember.assignedPartitions());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId2, 7, 8)
+ ), updatedMember.partitionsPendingAssignment());
+ }
+
+ @Test
+ public void testTransitionFromAssigningToStable() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(11)
+ .setNextMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)))
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(11, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(11, updatedMember.memberEpoch());
+ assertEquals(11, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ), updatedMember.assignedPartitions());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment());
+ }
+
+ @Test
+ public void testTransitionFromStableToStable() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(11)
+ .setNextMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(11, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
+ assertEquals(11, updatedMember.previousMemberEpoch());
+ assertEquals(11, updatedMember.memberEpoch());
+ assertEquals(11, updatedMember.nextMemberEpoch());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 3, 4, 5),
+ mkTopicAssignment(topicId2, 6, 7, 8)
+ ), updatedMember.assignedPartitions());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation());
+ assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment());
+ }
+
+ @Test
+ public void testNewTargetRestartReconciliation() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setNextMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 3),
+ mkTopicAssignment(topicId2, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2),
+ mkTopicAssignment(topicId2, 4, 5)))
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(topicId1, 4, 5),
+ mkTopicAssignment(topicId2, 7, 8)))
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state());
+
+ Assignment targetAssignment = new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 6, 7, 8),
+ mkTopicAssignment(topicId2, 9, 10, 11)
+ ));
+
+ ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(12, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .build();
+
+ assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state());
+ assertEquals(10, updatedMember.previousMemberEpoch());
+ assertEquals(10, updatedMember.memberEpoch());
+ assertEquals(12, updatedMember.nextMemberEpoch());
+ assertEquals(Collections.emptyMap(), updatedMember.assignedPartitions());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)
+ ), updatedMember.partitionsPendingRevocation());
+ assertEquals(mkAssignment(
+ mkTopicAssignment(topicId1, 6, 7, 8),
+ mkTopicAssignment(topicId2, 9, 10, 11)
+ ), updatedMember.partitionsPendingAssignment());
+ }
+
+ private static List<ConsumerGroupHeartbeatRequestData.TopicPartitions> requestFromAssignment(
+ Map<Uuid, Set<Integer>> assignment
+ ) {
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions> topicPartitions = new ArrayList<>();
+
+ assignment.forEach((topicId, partitions) -> {
+ ConsumerGroupHeartbeatRequestData.TopicPartitions topic = new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(new ArrayList<>(partitions));
+ topicPartitions.add(topic);
+ });
+
+ return topicPartitions;
+ }
+}