You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/09/29 00:30:07 UTC

[kafka] branch 3.0 updated: MINOR: optimize performAssignment to skip unnecessary check (#11218)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new e87176f  MINOR: optimize performAssignment to skip unnecessary check (#11218)
e87176f is described below

commit e87176fe423324cfcdb457df3f0c4eff6ca8803e
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Wed Sep 29 08:22:05 2021 +0800

    MINOR: optimize performAssignment to skip unnecessary check (#11218)
    
    Found this while reading the code. We did a "a little heavy" check each time after performing assignment, which is to compare the "assigned topics" set and the "subscribed topics" set, to see if there's any topics not existed in another set. Also, the "assigned topics" set is created by traversing all the assigned partitions, which will be a little heavy if partition numbers are large.
    
    However, as the comments described, it's a safe-guard for user-customized assignor, which might do assignment that we don't expected. In most cases, user will just use the in-product assignor, which guarantee that we only assign the topics from subscribed topics. Therefore, no need this check for in-product assignors.
    
    In this PR, I added an "in-product assignor names" list, and we'll in consumerCoordinator check if the assignor is one of in-product assignors, to decide if we need to do the additional check. Also add test for it.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>, Guozhang Wang <gu...@confluent.io>
---
 .../kafka/clients/consumer/ConsumerConfig.java     | 15 ++++
 .../consumer/CooperativeStickyAssignor.java        |  3 +-
 .../kafka/clients/consumer/RangeAssignor.java      |  3 +-
 .../kafka/clients/consumer/RoundRobinAssignor.java |  3 +-
 .../kafka/clients/consumer/StickyAssignor.java     |  3 +-
 .../consumer/internals/ConsumerCoordinator.java    | 78 ++++++++++++-------
 .../internals/ConsumerCoordinatorTest.java         | 90 ++++++++++++++++++----
 7 files changed, 149 insertions(+), 46 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 783c97e..ca24c28 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
@@ -39,6 +40,10 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
+import static org.apache.kafka.clients.consumer.RangeAssignor.RANGE_ASSIGNOR_NAME;
+import static org.apache.kafka.clients.consumer.RoundRobinAssignor.ROUNDROBIN_ASSIGNOR_NAME;
+import static org.apache.kafka.clients.consumer.StickyAssignor.STICKY_ASSIGNOR_NAME;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
@@ -48,6 +53,16 @@ import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 public class ConsumerConfig extends AbstractConfig {
     private static final ConfigDef CONFIG;
 
+    // a list contains all the assignor names that only assign subscribed topics to consumer. Should be updated when new assignor added.
+    // This is to help optimize ConsumerCoordinator#performAssignment method
+    public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
+        Collections.unmodifiableList(Arrays.asList(
+            RANGE_ASSIGNOR_NAME,
+            ROUNDROBIN_ASSIGNOR_NAME,
+            STICKY_ASSIGNOR_NAME,
+            COOPERATIVE_STICKY_ASSIGNOR_NAME
+        ));
+
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
      * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index 5f0bb0c..b2cca87 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.protocol.types.Type;
  * cooperative rebalancing. See the <a href="https://kafka.apache.org/documentation/#upgrade_240_notable">upgrade guide</a> for details.
  */
 public class CooperativeStickyAssignor extends AbstractStickyAssignor {
+    public static final String COOPERATIVE_STICKY_ASSIGNOR_NAME = "cooperative-sticky";
 
     // these schemas are used for preserving useful metadata for the assignment, such as the last stable generation
     private static final String GENERATION_KEY_NAME = "generation";
@@ -57,7 +58,7 @@ public class CooperativeStickyAssignor extends AbstractStickyAssignor {
 
     @Override
     public String name() {
-        return "cooperative-sticky";
+        return COOPERATIVE_STICKY_ASSIGNOR_NAME;
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index 21eb47a..aec0d39 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -65,10 +65,11 @@ import java.util.Map;
  * </ul>
  */
 public class RangeAssignor extends AbstractPartitionAssignor {
+    public static final String RANGE_ASSIGNOR_NAME = "range";
 
     @Override
     public String name() {
-        return "range";
+        return RANGE_ASSIGNOR_NAME;
     }
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index edecce8..2d6edea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -98,6 +98,7 @@ import java.util.TreeSet;
  * </ul>
  */
 public class RoundRobinAssignor extends AbstractPartitionAssignor {
+    public static final String ROUNDROBIN_ASSIGNOR_NAME = "roundrobin";
 
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
@@ -138,7 +139,7 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
 
     @Override
     public String name() {
-        return "roundrobin";
+        return ROUNDROBIN_ASSIGNOR_NAME;
     }
 
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 77a61df..787c432 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -174,6 +174,7 @@ import org.apache.kafka.common.utils.CollectionUtils;
  * {@link ConsumerPartitionAssignor.RebalanceProtocol} for a detailed explanation of cooperative rebalancing.
  */
 public class StickyAssignor extends AbstractStickyAssignor {
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
 
     // these schemas are used for preserving consumer's previously assigned partitions
     // list and sending it as user data to the leader during a rebalance
@@ -196,7 +197,7 @@ public class StickyAssignor extends AbstractStickyAssignor {
 
     @Override
     public String name() {
-        return "sticky";
+        return STICKY_ASSIGNOR_NAME;
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 68cf8a9..67ad51e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -81,6 +81,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
+
 /**
  * This class manages the coordination process with the consumer coordinator.
  */
@@ -556,6 +558,53 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         maybeUpdateSubscriptionMetadata();
     }
 
+    private boolean isAssignFromSubscribedTopicsAssignor(String name) {
+        return ASSIGN_FROM_SUBSCRIBED_ASSIGNORS.contains(name);
+    }
+
+    /**
+     * user-customized assignor may have created some topics that are not in the subscription list
+     * and assign their partitions to the members; in this case we would like to update the leader's
+     * own metadata with the newly added topics so that it will not trigger a subsequent rebalance
+     * when these topics gets updated from metadata refresh.
+     *
+     * We skip the check for in-product assignors since this will not happen in in-product assignors.
+     *
+     * TODO: this is a hack and not something we want to support long-term unless we push regex into the protocol
+     *       we may need to modify the ConsumerPartitionAssignor API to better support this case.
+     *
+     * @param assignorName          the selected assignor name
+     * @param assignments           the assignments after assignor assigned
+     * @param allSubscribedTopics   all consumers' subscribed topics
+     */
+    private void maybeUpdateGroupSubscription(String assignorName,
+                                              Map<String, Assignment> assignments,
+                                              Set<String> allSubscribedTopics) {
+        if (!isAssignFromSubscribedTopicsAssignor(assignorName)) {
+            Set<String> assignedTopics = new HashSet<>();
+            for (Assignment assigned : assignments.values()) {
+                for (TopicPartition tp : assigned.partitions())
+                    assignedTopics.add(tp.topic());
+            }
+
+            if (!assignedTopics.containsAll(allSubscribedTopics)) {
+                Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
+                notAssignedTopics.removeAll(assignedTopics);
+                log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics);
+            }
+
+            if (!allSubscribedTopics.containsAll(assignedTopics)) {
+                Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
+                newlyAddedTopics.removeAll(allSubscribedTopics);
+                log.info("The following not-subscribed topics are assigned, and their metadata will be " +
+                    "fetched from the brokers: {}", newlyAddedTopics);
+
+                allSubscribedTopics.addAll(newlyAddedTopics);
+                updateGroupSubscription(allSubscribedTopics);
+            }
+        }
+    }
+
     @Override
     protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                         String assignmentStrategy,
@@ -592,34 +641,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             validateCooperativeAssignment(ownedPartitions, assignments);
         }
 
-        // user-customized assignor may have created some topics that are not in the subscription list
-        // and assign their partitions to the members; in this case we would like to update the leader's
-        // own metadata with the newly added topics so that it will not trigger a subsequent rebalance
-        // when these topics gets updated from metadata refresh.
-        //
-        // TODO: this is a hack and not something we want to support long-term unless we push regex into the protocol
-        //       we may need to modify the ConsumerPartitionAssignor API to better support this case.
-        Set<String> assignedTopics = new HashSet<>();
-        for (Assignment assigned : assignments.values()) {
-            for (TopicPartition tp : assigned.partitions())
-                assignedTopics.add(tp.topic());
-        }
-
-        if (!assignedTopics.containsAll(allSubscribedTopics)) {
-            Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
-            notAssignedTopics.removeAll(assignedTopics);
-            log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics);
-        }
-
-        if (!allSubscribedTopics.containsAll(assignedTopics)) {
-            Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
-            newlyAddedTopics.removeAll(allSubscribedTopics);
-            log.info("The following not-subscribed topics are assigned, and their metadata will be " +
-                    "fetched from the brokers: {}", newlyAddedTopics);
-
-            allSubscribedTopics.addAll(assignedTopics);
-            updateGroupSubscription(allSubscribedTopics);
-        }
+        maybeUpdateGroupSubscription(assignor.name(), assignments, allSubscribedTopics);
 
         assignmentSnapshot = metadataSnapshot;
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 902ea99..e5aa804 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -77,6 +77,8 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -191,7 +193,8 @@ public abstract class ConsumerCoordinatorTest {
         this.coordinator = buildCoordinator(rebalanceConfig,
                                             metrics,
                                             assignors,
-                                            false);
+                                            false,
+                                            subscriptions);
     }
 
     private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstanceId) {
@@ -258,6 +261,63 @@ public abstract class ConsumerCoordinatorTest {
         return metrics.metrics().get(metrics.metricName(name, consumerId + groupId + "-coordinator-metrics"));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testPerformAssignmentShouldUpdateGroupSubscriptionAfterAssignmentIfNeeded() {
+        SubscriptionState mockSubscriptionState = Mockito.mock(SubscriptionState.class);
+
+        // the consumer only subscribed to "topic1"
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
+        for (Map.Entry<String, List<String>> subscriptionEntry : memberSubscriptions.entrySet()) {
+            ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
+            ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
+            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId(subscriptionEntry.getKey())
+                .setMetadata(buf.array()));
+        }
+
+        // normal case: the assignment result will have partitions for only the subscribed topic: "topic1"
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p)));
+
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, mockSubscriptionState)) {
+            coordinator.performAssignment("1", partitionAssignor.name(), metadata);
+
+            ArgumentCaptor<Collection<String>> topicsCaptor = ArgumentCaptor.forClass(Collection.class);
+            // groupSubscribe should be only called 1 time, which is before assignment,
+            // because the assigned topics are the same as the subscribed topics
+            Mockito.verify(mockSubscriptionState, Mockito.times(1)).groupSubscribe(topicsCaptor.capture());
+
+            List<Collection<String>> capturedTopics = topicsCaptor.getAllValues();
+
+            // expected the final group subscribed topics to be updated to "topic1"
+            Set<String> expectedTopicsGotCalled = new HashSet<>(Arrays.asList(topic1));
+            assertEquals(expectedTopicsGotCalled, capturedTopics.get(0));
+        }
+
+        Mockito.clearInvocations(mockSubscriptionState);
+
+        // unsubscribed topic partition assigned case: the assignment result will have partitions for (1) subscribed topic: "topic1"
+        // and (2) the additional unsubscribed topic: "topic2". We should add "topic2" into group subscription list
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(t1p, t2p)));
+
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, mockSubscriptionState)) {
+            coordinator.performAssignment("1", partitionAssignor.name(), metadata);
+
+            ArgumentCaptor<Collection<String>> topicsCaptor = ArgumentCaptor.forClass(Collection.class);
+            // groupSubscribe should be called 2 times, once before assignment, once after assignment
+            // (because the assigned topics are not the same as the subscribed topics)
+            Mockito.verify(mockSubscriptionState, Mockito.times(2)).groupSubscribe(topicsCaptor.capture());
+
+            List<Collection<String>> capturedTopics = topicsCaptor.getAllValues();
+
+            // expected the final group subscribed topics to be updated to "topic1" and "topic2"
+            Set<String> expectedTopicsGotCalled = new HashSet<>(Arrays.asList(topic1, topic2));
+            assertEquals(expectedTopicsGotCalled, capturedTopics.get(1));
+        }
+    }
+
     @Test
     public void testSelectRebalanceProtcol() {
         List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
@@ -265,14 +325,14 @@ public abstract class ConsumerCoordinatorTest {
         assignors.add(new MockPartitionAssignor(Collections.singletonList(COOPERATIVE)));
 
         // no commonly supported protocols
-        assertThrows(IllegalArgumentException.class, () -> buildCoordinator(rebalanceConfig, new Metrics(), assignors, false));
+        assertThrows(IllegalArgumentException.class, () -> buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, subscriptions));
 
         assignors.clear();
         assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, COOPERATIVE)));
         assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, COOPERATIVE)));
 
         // select higher indexed (more advanced) protocols
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, subscriptions)) {
             assertEquals(COOPERATIVE, coordinator.getProtocol());
         }
     }
@@ -1586,7 +1646,7 @@ public abstract class ConsumerCoordinatorTest {
         metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
                 false, subscriptions, new LogContext(), new ClusterResourceListeners());
         client = new MockClient(time, metadata);
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, subscriptions)) {
             subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
             Node node = new Node(0, "localhost", 9999);
             MetadataResponse.PartitionMetadata partitionMetadata =
@@ -1722,7 +1782,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitDynamicAssignment() {
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
         ) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(coordinator, singletonList(t1p));
@@ -1736,7 +1796,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitRetryBackoff() {
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(coordinator, singletonList(t1p));
 
@@ -1769,7 +1829,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitAwaitsInterval() {
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(coordinator, singletonList(t1p));
 
@@ -1806,7 +1866,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitDynamicAssignmentRebalance() {
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
             coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -1830,7 +1890,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitManualAssignment() {
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
             subscriptions.assignFromUser(singleton(t1p));
             subscriptions.seek(t1p, 100);
             client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@@ -1845,7 +1905,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitManualAssignmentCoordinatorUnknown() {
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
             subscriptions.assignFromUser(singleton(t1p));
             subscriptions.seek(t1p, 100);
 
@@ -2694,7 +2754,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitAfterCoordinatorBackToService() {
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
             subscriptions.assignFromUser(Collections.singleton(t1p));
             subscriptions.seek(t1p, 100L);
 
@@ -2907,7 +2967,8 @@ public abstract class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
                                                            new Metrics(),
                                                            assignors,
-                                                           autoCommit);
+                                                           autoCommit,
+                                                           subscriptions);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
         if (useGroupManagement) {
@@ -2996,14 +3057,15 @@ public abstract class ConsumerCoordinatorTest {
     private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig,
                                                  final Metrics metrics,
                                                  final List<ConsumerPartitionAssignor> assignors,
-                                                 final boolean autoCommitEnabled) {
+                                                 final boolean autoCommitEnabled,
+                                                 final SubscriptionState subscriptionState) {
         return new ConsumerCoordinator(
                 rebalanceConfig,
                 new LogContext(),
                 consumerClient,
                 assignors,
                 metadata,
-                subscriptions,
+                subscriptionState,
                 metrics,
                 consumerId + groupId,
                 time,