You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/09/13 12:08:03 UTC

[kafka] branch trunk updated: KAFKA-15275 - Client state machine basic components, states and initial transitions (#14323)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e47fa75371 KAFKA-15275 - Client state machine basic components, states and initial transitions (#14323)
0e47fa75371 is described below

commit 0e47fa75371dda275de73610b414c16a639895f3
Author: Lianet Magrans <98...@users.noreply.github.com>
AuthorDate: Wed Sep 13 08:07:56 2023 -0400

    KAFKA-15275 - Client state machine basic components, states and initial transitions (#14323)
    
    Initial definition of the core components for maintaining group membership on the client following the new consumer group protocol.
    
    This PR includes:
    - Membership management for keeping member state and assignment, based on the heartbeat responses received.
    - Assignor selection logic to support server side assignors.
    This only includes the basic initial states and transitions, to be extended as we implement the protocol.
    
    This is intended to be used from the heartbeat and assignment requests manager that actually build and process the heartbeat and assignment related requests.
    
    Reviewers: Philip Nee <pn...@confluent.io>, Kirk True <kt...@confluent.io>, David Jacot <dj...@confluent.io>
---
 .../consumer/internals/AssignorSelection.java      |  84 ++++++++
 .../clients/consumer/internals/MemberState.java    |  82 +++++++
 .../consumer/internals/MembershipManager.java      |  62 ++++++
 .../consumer/internals/MembershipManagerImpl.java  | 240 +++++++++++++++++++++
 .../clients/consumer/AssignorSelectionTest.java    |  56 +++++
 .../internals/MembershipManagerImplTest.java       | 202 +++++++++++++++++
 6 files changed, 726 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
new file mode 100644
index 00000000000..1d8d0c8cc51
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Objects;
+
+/**
+ * Selection of a type of assignor used by a member to get partitions assigned as part of a
+ * consumer group. Currently supported assignors are:
+ * <li>SERVER assignors</li>
+ * <p/>
+ * Server assignors include a name of the server assignor selected, ex. uniform, range.
+ */
+public class AssignorSelection {
+    public enum Type { SERVER }
+
+    private final AssignorSelection.Type type;
+    private String serverAssignor;
+
+    private AssignorSelection(Type type, String serverAssignor) {
+        this.type = type;
+        if (type == Type.SERVER) {
+            this.serverAssignor = serverAssignor;
+        } else {
+            throw new IllegalArgumentException("Unsupported assignor type " + type);
+        }
+    }
+
+    public static AssignorSelection newServerAssignor(String serverAssignor) {
+        if (serverAssignor == null) {
+            throw new IllegalArgumentException("Selected server assignor name cannot be null");
+        }
+        if (serverAssignor.isEmpty()) {
+            throw new IllegalArgumentException("Selected server assignor name cannot be empty");
+        }
+        return new AssignorSelection(Type.SERVER, serverAssignor);
+    }
+
+    public static AssignorSelection defaultAssignor() {
+        // TODO: review default selection
+        return new AssignorSelection(Type.SERVER, "uniform");
+    }
+
+    public String serverAssignor() {
+        return this.serverAssignor;
+    }
+
+    public Type type() {
+        return this.type;
+    }
+
+    @Override
+    public boolean equals(Object assignorSelection) {
+        if (this == assignorSelection) return true;
+        if (assignorSelection == null || getClass() != assignorSelection.getClass()) return false;
+        return Objects.equals(((AssignorSelection) assignorSelection).type, this.type) &&
+                Objects.equals(((AssignorSelection) assignorSelection).serverAssignor, this.serverAssignor);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, serverAssignor);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
new file mode 100644
index 00000000000..4da2eb54e5c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public enum MemberState {
+
+    /**
+     * Member has not joined a consumer group yet, or has been fenced and needs to re-join.
+     */
+    UNJOINED,
+
+    /**
+     * Member has received a new target assignment (partitions could have been assigned or
+     * revoked), and it is processing it. While in this state, the member will
+     * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make
+     * the new assignment effective.
+     */
+    // TODO: determine if separate state will be needed for assign/revoke (not for now)
+    RECONCILING,
+
+    /**
+     * Member is active in a group (heartbeating) and has processed all assignments received.
+     */
+    STABLE,
+
+    /**
+     * Member transitions to this state when it receives a
+     * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
+     * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the
+     * broker. This is a recoverable state, where the member
+     * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then
+     * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+     */
+    FENCED,
+
+    /**
+     * The member failed with an unrecoverable error
+     */
+    FAILED;
+
+    static {
+        // Valid state transitions
+        STABLE.previousValidStates = Arrays.asList(UNJOINED, RECONCILING);
+
+        RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED);
+
+        FAILED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
+
+        FENCED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
+
+        UNJOINED.previousValidStates = Arrays.asList(FENCED);
+    }
+
+    private List<MemberState> previousValidStates;
+
+    MemberState() {
+        this.previousValidStates = new ArrayList<>();
+    }
+
+    public List<MemberState> getPreviousValidStates() {
+        return this.previousValidStates;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
new file mode 100644
index 00000000000..a0a72e56336
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+
+import java.util.Optional;
+
+/**
+ * Manages group membership for a single member.
+ * Responsible for:
+ * <li>Keeping member state</li>
+ * <li>Keeping assignment for the member</li>
+ * <li>Computing assignment for the group if the member is required to do so<li/>
+ */
+public interface MembershipManager {
+
+    String groupId();
+
+    Optional<String> groupInstanceId();
+
+    String memberId();
+
+    int memberEpoch();
+
+    MemberState state();
+
+    /**
+     * Update the current state of the member based on a heartbeat response
+     */
+    void updateState(ConsumerGroupHeartbeatResponseData response);
+
+    /**
+     * Returns the {@link AssignorSelection} for the member
+     */
+    AssignorSelection assignorSelection();
+
+    /**
+     * Returns the current assignment for the member
+     */
+    ConsumerGroupHeartbeatResponseData.Assignment assignment();
+
+    /**
+     * Update the assignment for the member, indicating that the provided assignment is the new
+     * current assignment.
+     */
+    void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment);
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
new file mode 100644
index 00000000000..b23738dda77
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Optional;
+
+/**
+ * Membership manager that maintains group membership for a single member following the new
+ * consumer group protocol.
+ * <p/>
+ * This keeps membership state and assignment updated in-memory, based on the heartbeat responses
+ * the member receives. It is also responsible for computing assignment for the group based on
+ * the metadata, if the member has been selected by the broker to do so.
+ */
+public class MembershipManagerImpl implements MembershipManager {
+
+    private final String groupId;
+    private final Optional<String> groupInstanceId;
+    private String memberId;
+    private int memberEpoch;
+    private MemberState state;
+    private AssignorSelection assignorSelection;
+
+    /**
+     * Assignment that the member received from the server and successfully processed.
+     */
+    private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+    /**
+     * Assignment that the member received from the server but hasn't completely processed
+     * yet.
+     */
+    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment;
+    /**
+     * Latest assignment that the member received from the server while a {@link #targetAssignment}
+     * was in process.
+     */
+    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment;
+
+    public MembershipManagerImpl(String groupId) {
+        this(groupId, null, null);
+    }
+
+    public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) {
+        this.groupId = groupId;
+        this.state = MemberState.UNJOINED;
+        if (assignorSelection == null) {
+            setAssignorSelection(AssignorSelection.defaultAssignor());
+        } else {
+            setAssignorSelection(assignorSelection);
+        }
+        this.groupInstanceId = Optional.ofNullable(groupInstanceId);
+        this.targetAssignment = Optional.empty();
+        this.nextTargetAssignment = Optional.empty();
+    }
+
+    /**
+     * Update assignor selection for the member.
+     *
+     * @param assignorSelection New assignor selection
+     * @throws IllegalArgumentException If the provided assignor selection is null
+     */
+    public void setAssignorSelection(AssignorSelection assignorSelection) {
+        if (assignorSelection == null) {
+            throw new IllegalArgumentException("Assignor selection cannot be null");
+        }
+        this.assignorSelection = assignorSelection;
+    }
+
+    private void transitionTo(MemberState nextState) {
+        if (!this.state.equals(nextState) && !nextState.getPreviousValidStates().contains(state)) {
+            // TODO: handle invalid state transition
+            throw new RuntimeException(String.format("Invalid state transition from %s to %s",
+                    state, nextState));
+        }
+        this.state = nextState;
+    }
+
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    @Override
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    @Override
+    public String memberId() {
+        return memberId;
+    }
+
+    @Override
+    public int memberEpoch() {
+        return memberEpoch;
+    }
+
+    @Override
+    public void updateState(ConsumerGroupHeartbeatResponseData response) {
+        if (response.errorCode() == Errors.NONE.code()) {
+            this.memberId = response.memberId();
+            this.memberEpoch = response.memberEpoch();
+            ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment();
+            if (assignment != null) {
+                setTargetAssignment(assignment);
+            }
+            maybeTransitionToStable();
+        } else {
+            if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
+                resetEpoch();
+                transitionTo(MemberState.FENCED);
+            } else if (response.errorCode() == Errors.UNRELEASED_INSTANCE_ID.code()) {
+                transitionTo(MemberState.FAILED);
+            }
+            // TODO: handle other errors here to update state accordingly, mainly making the
+            //  distinction between the recoverable errors and the fatal ones, that should FAILED
+            //  the member
+        }
+    }
+
+    /**
+     * Transition to {@link MemberState#STABLE} only if there are no target assignments left to
+     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     */
+    private boolean maybeTransitionToStable() {
+        if (!hasPendingTargetAssignment()) {
+            transitionTo(MemberState.STABLE);
+        } else {
+            transitionTo(MemberState.RECONCILING);
+        }
+        return state.equals(MemberState.STABLE);
+    }
+
+    private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) {
+        if (!targetAssignment.isPresent()) {
+            targetAssignment = Optional.of(newTargetAssignment);
+        } else {
+            // Keep the latest next target assignment
+            nextTargetAssignment = Optional.of(newTargetAssignment);
+        }
+    }
+
+    private boolean hasPendingTargetAssignment() {
+        return targetAssignment.isPresent() || nextTargetAssignment.isPresent();
+    }
+
+
+    /**
+     * Update state and assignment as the member has successfully processed a new target
+     * assignment.
+     * This indicates the end of the reconciliation phase for the member, and makes the target
+     * assignment the new current assignment.
+     *
+     * @param assignment Target assignment the member was able to successfully process
+     */
+    public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+        updateAssignment(assignment);
+        transitionTo(MemberState.STABLE);
+    }
+
+    /**
+     * Update state and member info as the member was not able to process the assignment, due to
+     * errors in the execution of the user-provided callbacks.
+     *
+     * @param error Exception found during the execution of the user-provided callbacks
+     */
+    public void onAssignmentProcessFailure(Throwable error) {
+        transitionTo(MemberState.FAILED);
+        // TODO: update member info appropriately, to clear up whatever shouldn't be kept in
+        //  this unrecoverable state
+    }
+
+    private void resetEpoch() {
+        this.memberEpoch = 0;
+    }
+
+    @Override
+    public MemberState state() {
+        return state;
+    }
+
+    @Override
+    public AssignorSelection assignorSelection() {
+        return this.assignorSelection;
+    }
+
+    @Override
+    public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+        return this.currentAssignment;
+    }
+
+    // VisibleForTesting
+    Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() {
+        return targetAssignment;
+    }
+
+    // VisibleForTesting
+    Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment() {
+        return nextTargetAssignment;
+    }
+
+    /**
+     * Set the current assignment for the member. This indicates that the reconciliation of the
+     * target assignment has been successfully completed.
+     * This will clear the {@link #targetAssignment}, and take on the
+     * {@link #nextTargetAssignment} if any.
+     *
+     * @param assignment Assignment that has been successfully processed as part of the
+     *                   reconciliation process.
+     */
+    @Override
+    public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+        this.currentAssignment = assignment;
+        if (!nextTargetAssignment.isPresent()) {
+            targetAssignment = Optional.empty();
+        } else {
+            targetAssignment = Optional.of(nextTargetAssignment.get());
+            nextTargetAssignment = Optional.empty();
+        }
+        maybeTransitionToStable();
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
new file mode 100644
index 00000000000..bdae8adcbe8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.AssignorSelection;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class AssignorSelectionTest {
+
+    @Test
+    public void testServerAssignorCannotBeNullOrEmptyIfSelected() {
+        assertThrows(IllegalArgumentException.class,
+                () -> AssignorSelection.newServerAssignor(null));
+        assertThrows(IllegalArgumentException.class,
+                () -> AssignorSelection.newServerAssignor(""));
+    }
+
+    @Test
+    public void testEquals() {
+        // Server assignors
+        AssignorSelection selection1 = AssignorSelection.newServerAssignor("range");
+        AssignorSelection selection2 = AssignorSelection.newServerAssignor("range");
+        assertEquals(selection1, selection1);
+        assertEquals(selection1, selection2);
+        AssignorSelection selection3 = AssignorSelection.newServerAssignor("uniform");
+        assertNotEquals(selection1, selection3);
+        assertNotEquals(selection1, null);
+    }
+
+    @Test
+    public void testServerAssignorSelection() {
+        String assignorName = "uniform";
+        AssignorSelection selection = AssignorSelection.newServerAssignor(assignorName);
+        assertEquals(AssignorSelection.Type.SERVER, selection.type());
+        assertEquals(assignorName, selection.serverAssignor());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
new file mode 100644
index 00000000000..8c20b2837d5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class MembershipManagerImplTest {
+
+    private static final String GROUP_ID = "test-group";
+    private static final String MEMBER_ID = "test-member-1";
+    private static final int MEMBER_EPOCH = 1;
+
+    @Test
+    public void testMembershipManagerDefaultAssignor() {
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+        assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection());
+
+        membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", null);
+        assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection());
+    }
+
+    @Test
+    public void testMembershipManagerAssignorSelectionUpdate() {
+        AssignorSelection firstAssignorSelection = AssignorSelection.newServerAssignor("uniform");
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1",
+                firstAssignorSelection);
+        assertEquals(firstAssignorSelection, membershipManager.assignorSelection());
+
+        AssignorSelection secondAssignorSelection = AssignorSelection.newServerAssignor("range");
+        membershipManager.setAssignorSelection(secondAssignorSelection);
+        assertEquals(secondAssignorSelection, membershipManager.assignorSelection());
+
+        assertThrows(IllegalArgumentException.class,
+                () -> membershipManager.setAssignorSelection(null));
+    }
+
+    @Test
+    public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
+        new MembershipManagerImpl(GROUP_ID);
+        new MembershipManagerImpl(GROUP_ID, null, AssignorSelection.defaultAssignor());
+    }
+
+    @Test
+    public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+        assertEquals(MemberState.UNJOINED, membershipManager.state());
+
+        ConsumerGroupHeartbeatResponse responseWithoutAssignment =
+                createConsumerGroupHeartbeatResponse(null);
+        membershipManager.updateState(responseWithoutAssignment.data());
+        assertNotEquals(MemberState.RECONCILING, membershipManager.state());
+
+        ConsumerGroupHeartbeatResponse responseWithAssignment =
+                createConsumerGroupHeartbeatResponse(createAssignment());
+        membershipManager.updateState(responseWithAssignment.data());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+    }
+
+    @ParameterizedTest
+    @EnumSource(Errors.class)
+    public void testMemberIdAndEpochResetOnErrors(Errors error) {
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+        ConsumerGroupHeartbeatResponse heartbeatResponse =
+                createConsumerGroupHeartbeatResponse(null);
+        membershipManager.updateState(heartbeatResponse.data());
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        assertEquals(MEMBER_ID, membershipManager.memberId());
+        assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
+
+        if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.FENCED_MEMBER_EPOCH) {
+            // Should reset member epoch and keep member id
+            ConsumerGroupHeartbeatResponse heartbeatResponseWithMemberIdError =
+                    createConsumerGroupHeartbeatResponseWithError(Errors.FENCED_MEMBER_EPOCH);
+            membershipManager.updateState(heartbeatResponseWithMemberIdError.data());
+
+            assertFalse(membershipManager.memberId().isEmpty());
+            assertEquals(0, membershipManager.memberEpoch());
+        } else {
+            // Should not reset member id or epoch
+            ConsumerGroupHeartbeatResponse heartbeatResponseWithError =
+                    createConsumerGroupHeartbeatResponseWithError(error);
+            membershipManager.updateState(heartbeatResponseWithError.data());
+
+            assertFalse(membershipManager.memberId().isEmpty());
+            assertNotEquals(0, membershipManager.memberEpoch());
+        }
+    }
+
+    @Test
+    public void testUpdateAssignment() {
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+        ConsumerGroupHeartbeatResponseData.Assignment newAssignment = createAssignment();
+        ConsumerGroupHeartbeatResponse heartbeatResponse =
+                createConsumerGroupHeartbeatResponse(newAssignment);
+        membershipManager.updateState(heartbeatResponse.data());
+
+        // Target assignment should be in the process of being reconciled
+        checkAssignments(membershipManager, null, newAssignment, null);
+    }
+
+    @Test
+    public void testUpdateAssignmentReceivingAssignmentWhileAnotherInProcess() {
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+        ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment();
+        membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
+
+        // First target assignment received should be in the process of being reconciled
+        checkAssignments(membershipManager, null, newAssignment1, null);
+
+        // Second target assignment received while there is another one being reconciled
+        ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment();
+        membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
+        checkAssignments(membershipManager, null, newAssignment1, newAssignment2);
+    }
+
+    @Test
+    public void testNextTargetAssignmentHoldsLatestAssignmentReceivedWhileAnotherInProcess() {
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+        ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment();
+        membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
+
+        // First target assignment received, remains in the process of being reconciled
+        checkAssignments(membershipManager, null, newAssignment1, null);
+
+        // Second target assignment received while there is another one being reconciled
+        ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment();
+        membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
+        checkAssignments(membershipManager, null, newAssignment1, newAssignment2);
+
+        // If more assignments are received while there is one being reconciled, the most recent
+        // assignment received is kept as nextTargetAssignment
+        ConsumerGroupHeartbeatResponseData.Assignment newAssignment3 = createAssignment();
+        membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment3).data());
+        checkAssignments(membershipManager, null, newAssignment1, newAssignment3);
+    }
+
+    private void checkAssignments(
+            MembershipManagerImpl membershipManager,
+            ConsumerGroupHeartbeatResponseData.Assignment expectedCurrentAssignment,
+            ConsumerGroupHeartbeatResponseData.Assignment expectedTargetAssignment,
+            ConsumerGroupHeartbeatResponseData.Assignment expectedNextTargetAssignment) {
+        assertEquals(expectedCurrentAssignment, membershipManager.assignment());
+        assertEquals(expectedTargetAssignment, membershipManager.targetAssignment().orElse(null));
+        assertEquals(expectedNextTargetAssignment, membershipManager.nextTargetAssignment().orElse(null));
+
+    }
+
+    private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+        return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
+                .setErrorCode(Errors.NONE.code())
+                .setMemberId(MEMBER_ID)
+                .setMemberEpoch(MEMBER_EPOCH)
+                .setAssignment(assignment));
+    }
+
+    private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError(Errors error) {
+        return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
+                .setErrorCode(error.code())
+                .setMemberId(MEMBER_ID)
+                .setMemberEpoch(5));
+    }
+
+    private ConsumerGroupHeartbeatResponseData.Assignment createAssignment() {
+        return new ConsumerGroupHeartbeatResponseData.Assignment()
+                .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                                .setTopicId(Uuid.randomUuid())
+                                .setPartitions(Arrays.asList(0, 1, 2)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                                .setTopicId(Uuid.randomUuid())
+                                .setPartitions(Arrays.asList(3, 4, 5))
+                ));
+    }
+}