You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/02/06 23:12:35 UTC

[kafka] branch 3.1 updated: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode (#11631)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new d83825b  KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode (#11631)
d83825b is described below

commit d83825bfc99503ca946256d2097a4175b4518bbb
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Mon Feb 7 07:07:59 2022 +0800

    KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode (#11631)
    
    After KAFKA-10793, we clear the findCoordinatorFuture in 2 places:
    
    1. heartbeat thread
    2. AbstractCoordinator#ensureCoordinatorReady
    
    But in non consumer group mode with group id provided (for offset commitment. So that there will be consumerCoordinator created), there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the findCoordinatorFuture , and causes the offset commit never succeeded.
    
    To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the findCoordinatorFuture in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each ConsumerCoordinator#poll.
    
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../consumer/internals/ConsumerCoordinator.java    |  18 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 185 ++++++---------------
 .../internals/ConsumerCoordinatorTest.java         |  15 ++
 .../kafka/api/AuthorizerIntegrationTest.scala      |   3 +-
 4 files changed, 74 insertions(+), 147 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index fad7f92..a7194a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -455,6 +455,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
+    private boolean coordinatorUnknownAndUnready(Timer timer) {
+        return coordinatorUnknown() && !ensureCoordinatorReady(timer);
+    }
+
     /**
      * Poll for coordinator events. This ensures that the coordinator is known and that the consumer
      * has joined the group (if it is using group management). This also handles periodic offset commits
@@ -480,7 +484,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
             // group proactively due to application inactivity even if (say) the coordinator cannot be found.
             pollHeartbeat(timer.currentTimeMs());
-            if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
+            if (coordinatorUnknownAndUnready(timer)) {
                 return false;
             }
 
@@ -517,15 +521,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 }
             }
         } else {
-            // For manually assigned partitions, if there are no ready nodes, await metadata.
+            // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata.
             // If connections to all nodes fail, wakeups triggered while attempting to send fetch
             // requests result in polls returning immediately, causing a tight loop of polls. Without
             // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
-            // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
-            // When group management is used, metadata wait is already performed for this scenario as
-            // coordinator is unknown, hence this check is not required.
-            if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
-                client.awaitMetadataUpdate(timer);
+            // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
+            if (coordinatorUnknownAndUnready(timer)) {
+                return false;
             }
         }
 
@@ -1021,7 +1023,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             return true;
 
         do {
-            if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
+            if (coordinatorUnknownAndUnready(timer)) {
                 return false;
             }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9b79473..f8b86b8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -192,6 +192,9 @@ public class KafkaConsumerTest {
     private final String partitionLost = "Hit partition lost ";
 
     private final Collection<TopicPartition> singleTopicPartition = Collections.singleton(new TopicPartition(topic, 0));
+    private final Time time = new MockTime();
+    private final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+    private final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
     @Test
     public void testMetricsReporterAutoGeneratedClientId() {
@@ -266,12 +269,9 @@ public class KafkaConsumerTest {
     }
 
     private KafkaConsumer<String, String> setUpConsumerWithRecordsToPoll(TopicPartition tp, int recordCount, Deserializer<String> deserializer) {
-        Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(tp.topic(), 1);
         Node node = cluster.nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
@@ -538,16 +538,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void verifyHeartbeatSent() throws Exception {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -573,14 +569,11 @@ public class KafkaConsumerTest {
 
     @Test
     public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -607,16 +600,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void verifyPollTimesOutDuringMetadataUpdate() {
-        final Time time = new MockTime();
-        final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         // Since we would enable the heartbeat thread after received join-response which could
@@ -635,16 +624,12 @@ public class KafkaConsumerTest {
     @SuppressWarnings("deprecation")
     @Test
     public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() {
-        final Time time = new MockTime();
-        final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -660,15 +645,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, null, groupInstanceId, false);
         consumer.assign(singleton(tp0));
         consumer.seekToBeginning(singleton(tp0));
 
@@ -684,21 +666,57 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() {
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
+        // create a consumer with groupID with manual assignment
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer.assign(singleton(tp0));
+
+        // 1st coordinator error should cause coordinator unknown
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, groupId, node), node);
+        consumer.poll(Duration.ofMillis(0));
+
+        // 2nd coordinator error should find the correct coordinator and clear the findCoordinatorFuture
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
+
+        client.prepareResponse(offsetResponse(Collections.singletonMap(tp0, 50L), Errors.NONE));
+        client.prepareResponse(fetchResponse(tp0, 50L, 5));
+
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));
+        assertEquals(5, records.count());
+        assertEquals(55L, consumer.position(tp0));
+
+        // after coordinator found, consumer should be able to commit the offset successfully
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp0, Errors.NONE)));
+        consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(55L)));
+
+        // verify the offset is committed
+        client.prepareResponse(offsetResponse(Collections.singletonMap(tp0, 55L), Errors.NONE));
+        assertEquals(55, consumer.committed(Collections.singleton(tp0), Duration.ZERO).get(tp0).offset());
+        consumer.close(Duration.ofMillis(0));
+    }
+
+    @Test
     public void testFetchProgressWithMissingPartitionPosition() {
         // Verifies that we can make progress on one partition while we are awaiting
         // a reset on another partition.
 
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 2));
+        Node node = metadata.fetch().nodes().get(0);
 
         KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, subscription, metadata);
         consumer.assign(Arrays.asList(tp0, tp1));
         consumer.seekToEnd(singleton(tp0));
         consumer.seekToBeginning(singleton(tp1));
 
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
         client.prepareResponse(body -> {
             ListOffsetsRequest request = (ListOffsetsRequest) body;
             List<ListOffsetsPartition> partitions = request.topics().stream().flatMap(t -> {
@@ -742,7 +760,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testMissingOffsetNoResetPolicy() {
-        Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
@@ -750,8 +767,6 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId, false);
         consumer.assign(singletonList(tp0));
@@ -766,7 +781,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testResetToCommittedOffset() {
-        Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
@@ -774,8 +788,6 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId, false);
         consumer.assign(singletonList(tp0));
@@ -791,7 +803,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testResetUsingAutoResetPolicy() {
-        Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
@@ -799,8 +810,6 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId, false);
         consumer.assign(singletonList(tp0));
@@ -818,15 +827,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void testOffsetIsValidAfterSeek() {
-        Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
                 true, groupId, Optional.empty(), false);
         consumer.assign(singletonList(tp0));
@@ -840,16 +846,12 @@ public class KafkaConsumerTest {
         long offset1 = 10000;
         long offset2 = 20000;
 
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
@@ -894,8 +896,6 @@ public class KafkaConsumerTest {
     private KafkaConsumer<String, String> setupThrowableConsumer() {
         long offset1 = 10000;
 
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -904,8 +904,6 @@ public class KafkaConsumerTest {
 
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(
             time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, true);
         consumer.assign(singletonList(tp0));
@@ -922,16 +920,12 @@ public class KafkaConsumerTest {
     public void testNoCommittedOffsets() {
         long offset1 = 10000;
 
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(Arrays.asList(tp0, tp1));
 
@@ -951,16 +945,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void testAutoCommitSentBeforePositionUpdate() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -988,8 +978,6 @@ public class KafkaConsumerTest {
     @Test
     public void testRegexSubscription() {
         String unmatchedTopic = "unmatched";
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1000,8 +988,6 @@ public class KafkaConsumerTest {
         initMetadata(client, partitionCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
 
@@ -1018,13 +1004,9 @@ public class KafkaConsumerTest {
 
     @Test
     public void testChangingRegexSubscription() {
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         String otherTopic = "other";
         TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
 
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1057,16 +1039,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void testWakeupWithFetchDataAvailable() throws Exception {
-        final Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1098,16 +1076,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void testPollThrowsInterruptExceptionIfInterrupted() {
-        final Time time = new MockTime();
-        final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1128,16 +1102,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void fetchResponseWithUnexpectedPartitionIsIgnored() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RangeAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
 
@@ -1164,8 +1134,6 @@ public class KafkaConsumerTest {
      */
     @Test
     public void testSubscriptionChangesWithAutoCommitEnabled() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1279,8 +1247,6 @@ public class KafkaConsumerTest {
      */
     @Test
     public void testSubscriptionChangesWithAutoCommitDisabled() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1336,8 +1302,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1361,8 +1325,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws Exception {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1398,8 +1360,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testManualAssignmentChangeWithAutoCommitEnabled() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1455,8 +1415,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testManualAssignmentChangeWithAutoCommitDisabled() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1512,8 +1470,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testOffsetOfPausedPartitions() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1707,16 +1663,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -1779,16 +1731,12 @@ public class KafkaConsumerTest {
                                    List<? extends AbstractResponse> responses,
                                    long waitMs,
                                    boolean interrupt) throws Exception {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty());
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1865,8 +1813,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testPartitionsForNonExistingTopic() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1879,8 +1825,6 @@ public class KafkaConsumerTest {
             Collections.emptyList());
         client.prepareResponse(updateResponse);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic"));
     }
@@ -1954,7 +1898,6 @@ public class KafkaConsumerTest {
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
             assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
@@ -1999,7 +1942,6 @@ public class KafkaConsumerTest {
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
             assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
@@ -2022,16 +1964,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void testRebalanceException() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         consumer.subscribe(singleton(topic), getExceptionConsumerRebalanceListener());
@@ -2069,7 +2007,6 @@ public class KafkaConsumerTest {
     @Test
     public void testReturnRecordsDuringRebalance() throws InterruptedException {
         Time time = new MockTime(1L);
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
@@ -2194,16 +2131,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void testGetGroupMetadata() {
-        final Time time = new MockTime();
-        final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         final Node node = metadata.fetch().nodes().get(0);
 
-        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         final ConsumerGroupMetadata groupMetadataOnStart = consumer.groupMetadata();
@@ -2228,8 +2161,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testInvalidGroupMetadata() throws InterruptedException {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
@@ -2257,13 +2188,10 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCurrentLag() {
-        final Time time = new MockTime();
-        final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, singletonMap(topic, 1));
-        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer =
             newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
@@ -2315,13 +2243,10 @@ public class KafkaConsumerTest {
 
     @Test
     public void testListOffsetShouldUpateSubscriptions() {
-        final Time time = new MockTime();
-        final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, singletonMap(topic, 1));
-        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer =
                 newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
@@ -2343,7 +2268,6 @@ public class KafkaConsumerTest {
     }
 
     private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(final Time time) {
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -2641,14 +2565,16 @@ public class KafkaConsumerTest {
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time,
                 retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
 
-        GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+        ConsumerCoordinator consumerCoordinator = null;
+        if (groupId != null) {
+            GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
                 rebalanceTimeoutMs,
                 heartbeatIntervalMs,
                 groupId,
                 groupInstanceId,
                 retryBackoffMs,
                 true);
-        ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(rebalanceConfig,
+            consumerCoordinator = new ConsumerCoordinator(rebalanceConfig,
                 loggerFactory,
                 consumerClient,
                 assignors,
@@ -2661,6 +2587,7 @@ public class KafkaConsumerTest {
                 autoCommitIntervalMs,
                 interceptors,
                 throwOnStableOffsetNotSupported);
+        }
         Fetcher<String, String> fetcher = new Fetcher<>(
                 loggerFactory,
                 consumerClient,
@@ -2723,16 +2650,12 @@ public class KafkaConsumerTest {
 
     @Test
     public void testSubscriptionOnInvalidTopic() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Cluster cluster = metadata.fetch();
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         String invalidTopicName = "topic abc";  // Invalid topic name due to space
 
         List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
@@ -2752,14 +2675,10 @@ public class KafkaConsumerTest {
 
     @Test
     public void testPollTimeMetrics() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singletonList(topic));
         // MetricName objects to check
@@ -2801,14 +2720,10 @@ public class KafkaConsumerTest {
 
     @Test
     public void testPollIdleRatio() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
 
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         // MetricName object to check
         Metrics metrics = consumer.metrics;
@@ -2851,8 +2766,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testClosingConsumerUnregistersConsumerMetrics() {
-        Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
@@ -2879,10 +2792,8 @@ public class KafkaConsumerTest {
     @Test
     public void testEnforceRebalanceTriggersRebalanceOnNextPoll() {
         Time time = new MockTime(1L);
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
-        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         MockRebalanceListener countingRebalanceListener = new MockRebalanceListener();
         initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
@@ -2976,8 +2887,6 @@ public class KafkaConsumerTest {
     }
 
     private KafkaConsumer<String, String> consumerForCheckingTimeoutException() {
-        final Time time = new MockTime();
-        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 96aaf8b..b073995 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -478,6 +478,21 @@ public abstract class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testCoordinatorNotAvailableWithUserAssignedType() {
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE));
+        // set timeout to 0 because we don't want to retry after the error
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());
+
+        // should find an available node in next find coordinator request
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
     public void testCoordinatorNotAvailable() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0b215ff..c7e8cb9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1038,7 +1038,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
 
     // in this case, we do an explicit seek, so there should be no need to query the coordinator at all
-    val consumer = createConsumer()
+    // remove the group.id config to avoid coordinator created
+    val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
     consumer.assign(List(tp).asJava)
     consumer.seekToBeginning(List(tp).asJava)
     consumeRecords(consumer)