You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/02/06 23:12:35 UTC
[kafka] branch 3.1 updated: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode (#11631)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new d83825b KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode (#11631)
d83825b is described below
commit d83825bfc99503ca946256d2097a4175b4518bbb
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Mon Feb 7 07:07:59 2022 +0800
KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode (#11631)
After KAFKA-10793, we clear the findCoordinatorFuture in 2 places:
1. heartbeat thread
2. AbstractCoordinator#ensureCoordinatorReady
But in non consumer group mode with group id provided (for offset commitment. So that there will be consumerCoordinator created), there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the findCoordinatorFuture , and causes the offset commit never succeeded.
To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the findCoordinatorFuture in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each ConsumerCoordinator#poll.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../consumer/internals/ConsumerCoordinator.java | 18 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 185 ++++++---------------
.../internals/ConsumerCoordinatorTest.java | 15 ++
.../kafka/api/AuthorizerIntegrationTest.scala | 3 +-
4 files changed, 74 insertions(+), 147 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index fad7f92..a7194a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -455,6 +455,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
+ private boolean coordinatorUnknownAndUnready(Timer timer) {
+ return coordinatorUnknown() && !ensureCoordinatorReady(timer);
+ }
+
/**
* Poll for coordinator events. This ensures that the coordinator is known and that the consumer
* has joined the group (if it is using group management). This also handles periodic offset commits
@@ -480,7 +484,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
pollHeartbeat(timer.currentTimeMs());
- if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
+ if (coordinatorUnknownAndUnready(timer)) {
return false;
}
@@ -517,15 +521,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
} else {
- // For manually assigned partitions, if there are no ready nodes, await metadata.
+ // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
- // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
- // When group management is used, metadata wait is already performed for this scenario as
- // coordinator is unknown, hence this check is not required.
- if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
- client.awaitMetadataUpdate(timer);
+ // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
+ if (coordinatorUnknownAndUnready(timer)) {
+ return false;
}
}
@@ -1021,7 +1023,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return true;
do {
- if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
+ if (coordinatorUnknownAndUnready(timer)) {
return false;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9b79473..f8b86b8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -192,6 +192,9 @@ public class KafkaConsumerTest {
private final String partitionLost = "Hit partition lost ";
private final Collection<TopicPartition> singleTopicPartition = Collections.singleton(new TopicPartition(topic, 0));
+ private final Time time = new MockTime();
+ private final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+ private final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
@Test
public void testMetricsReporterAutoGeneratedClientId() {
@@ -266,12 +269,9 @@ public class KafkaConsumerTest {
}
private KafkaConsumer<String, String> setUpConsumerWithRecordsToPoll(TopicPartition tp, int recordCount, Deserializer<String> deserializer) {
- Time time = new MockTime();
Cluster cluster = TestUtils.singletonCluster(tp.topic(), 1);
Node node = cluster.nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -538,16 +538,12 @@ public class KafkaConsumerTest {
@Test
public void verifyHeartbeatSent() throws Exception {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -573,14 +569,11 @@ public class KafkaConsumerTest {
@Test
public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -607,16 +600,12 @@ public class KafkaConsumerTest {
@Test
public void verifyPollTimesOutDuringMetadataUpdate() {
- final Time time = new MockTime();
- final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
// Since we would enable the heartbeat thread after received join-response which could
@@ -635,16 +624,12 @@ public class KafkaConsumerTest {
@SuppressWarnings("deprecation")
@Test
public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() {
- final Time time = new MockTime();
- final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -660,15 +645,12 @@ public class KafkaConsumerTest {
@Test
public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, null, groupInstanceId, false);
consumer.assign(singleton(tp0));
consumer.seekToBeginning(singleton(tp0));
@@ -684,21 +666,57 @@ public class KafkaConsumerTest {
}
@Test
+ public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() {
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
+ // create a consumer with groupID with manual assignment
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer.assign(singleton(tp0));
+
+ // 1st coordinator error should cause coordinator unknown
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, groupId, node), node);
+ consumer.poll(Duration.ofMillis(0));
+
+ // 2nd coordinator error should find the correct coordinator and clear the findCoordinatorFuture
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
+
+ client.prepareResponse(offsetResponse(Collections.singletonMap(tp0, 50L), Errors.NONE));
+ client.prepareResponse(fetchResponse(tp0, 50L, 5));
+
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));
+ assertEquals(5, records.count());
+ assertEquals(55L, consumer.position(tp0));
+
+ // after coordinator found, consumer should be able to commit the offset successfully
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp0, Errors.NONE)));
+ consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(55L)));
+
+ // verify the offset is committed
+ client.prepareResponse(offsetResponse(Collections.singletonMap(tp0, 55L), Errors.NONE));
+ assertEquals(55, consumer.committed(Collections.singleton(tp0), Duration.ZERO).get(tp0).offset());
+ consumer.close(Duration.ofMillis(0));
+ }
+
+ @Test
public void testFetchProgressWithMissingPartitionPosition() {
// Verifies that we can make progress on one partition while we are awaiting
// a reset on another partition.
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
+ Node node = metadata.fetch().nodes().get(0);
KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, subscription, metadata);
consumer.assign(Arrays.asList(tp0, tp1));
consumer.seekToEnd(singleton(tp0));
consumer.seekToBeginning(singleton(tp1));
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
client.prepareResponse(body -> {
ListOffsetsRequest request = (ListOffsetsRequest) body;
List<ListOffsetsPartition> partitions = request.topics().stream().flatMap(t -> {
@@ -742,7 +760,6 @@ public class KafkaConsumerTest {
@Test
public void testMissingOffsetNoResetPolicy() {
- Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -750,8 +767,6 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));
@@ -766,7 +781,6 @@ public class KafkaConsumerTest {
@Test
public void testResetToCommittedOffset() {
- Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -774,8 +788,6 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));
@@ -791,7 +803,6 @@ public class KafkaConsumerTest {
@Test
public void testResetUsingAutoResetPolicy() {
- Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -799,8 +810,6 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));
@@ -818,15 +827,12 @@ public class KafkaConsumerTest {
@Test
public void testOffsetIsValidAfterSeek() {
- Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
true, groupId, Optional.empty(), false);
consumer.assign(singletonList(tp0));
@@ -840,16 +846,12 @@ public class KafkaConsumerTest {
long offset1 = 10000;
long offset2 = 20000;
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
@@ -894,8 +896,6 @@ public class KafkaConsumerTest {
private KafkaConsumer<String, String> setupThrowableConsumer() {
long offset1 = 10000;
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -904,8 +904,6 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(
time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, true);
consumer.assign(singletonList(tp0));
@@ -922,16 +920,12 @@ public class KafkaConsumerTest {
public void testNoCommittedOffsets() {
long offset1 = 10000;
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(Arrays.asList(tp0, tp1));
@@ -951,16 +945,12 @@ public class KafkaConsumerTest {
@Test
public void testAutoCommitSentBeforePositionUpdate() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -988,8 +978,6 @@ public class KafkaConsumerTest {
@Test
public void testRegexSubscription() {
String unmatchedTopic = "unmatched";
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1000,8 +988,6 @@ public class KafkaConsumerTest {
initMetadata(client, partitionCounts);
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
@@ -1018,13 +1004,9 @@ public class KafkaConsumerTest {
@Test
public void testChangingRegexSubscription() {
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
String otherTopic = "other";
TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1057,16 +1039,12 @@ public class KafkaConsumerTest {
@Test
public void testWakeupWithFetchDataAvailable() throws Exception {
- final Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1098,16 +1076,12 @@ public class KafkaConsumerTest {
@Test
public void testPollThrowsInterruptExceptionIfInterrupted() {
- final Time time = new MockTime();
- final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1128,16 +1102,12 @@ public class KafkaConsumerTest {
@Test
public void fetchResponseWithUnexpectedPartitionIsIgnored() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RangeAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
@@ -1164,8 +1134,6 @@ public class KafkaConsumerTest {
*/
@Test
public void testSubscriptionChangesWithAutoCommitEnabled() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1279,8 +1247,6 @@ public class KafkaConsumerTest {
*/
@Test
public void testSubscriptionChangesWithAutoCommitDisabled() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1336,8 +1302,6 @@ public class KafkaConsumerTest {
@Test
public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1361,8 +1325,6 @@ public class KafkaConsumerTest {
@Test
public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws Exception {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1398,8 +1360,6 @@ public class KafkaConsumerTest {
@Test
public void testManualAssignmentChangeWithAutoCommitEnabled() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1455,8 +1415,6 @@ public class KafkaConsumerTest {
@Test
public void testManualAssignmentChangeWithAutoCommitDisabled() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1512,8 +1470,6 @@ public class KafkaConsumerTest {
@Test
public void testOffsetOfPausedPartitions() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1707,16 +1663,12 @@ public class KafkaConsumerTest {
@Test
public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -1779,16 +1731,12 @@ public class KafkaConsumerTest {
List<? extends AbstractResponse> responses,
long waitMs,
boolean interrupt) throws Exception {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty());
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1865,8 +1813,6 @@ public class KafkaConsumerTest {
@Test
public void testPartitionsForNonExistingTopic() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1879,8 +1825,6 @@ public class KafkaConsumerTest {
Collections.emptyList());
client.prepareResponse(updateResponse);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic"));
}
@@ -1954,7 +1898,6 @@ public class KafkaConsumerTest {
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
@@ -1999,7 +1942,6 @@ public class KafkaConsumerTest {
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
@@ -2022,16 +1964,12 @@ public class KafkaConsumerTest {
@Test
public void testRebalanceException() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getExceptionConsumerRebalanceListener());
@@ -2069,7 +2007,6 @@ public class KafkaConsumerTest {
@Test
public void testReturnRecordsDuringRebalance() throws InterruptedException {
Time time = new MockTime(1L);
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
@@ -2194,16 +2131,12 @@ public class KafkaConsumerTest {
@Test
public void testGetGroupMetadata() {
- final Time time = new MockTime();
- final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
final Node node = metadata.fetch().nodes().get(0);
- final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
final ConsumerGroupMetadata groupMetadataOnStart = consumer.groupMetadata();
@@ -2228,8 +2161,6 @@ public class KafkaConsumerTest {
@Test
public void testInvalidGroupMetadata() throws InterruptedException {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -2257,13 +2188,10 @@ public class KafkaConsumerTest {
@Test
public void testCurrentLag() {
- final Time time = new MockTime();
- final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, singletonMap(topic, 1));
- final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer =
newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
@@ -2315,13 +2243,10 @@ public class KafkaConsumerTest {
@Test
public void testListOffsetShouldUpateSubscriptions() {
- final Time time = new MockTime();
- final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, singletonMap(topic, 1));
- final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer =
newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
@@ -2343,7 +2268,6 @@ public class KafkaConsumerTest {
}
private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(final Time time) {
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -2641,14 +2565,16 @@ public class KafkaConsumerTest {
ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time,
retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
- GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+ ConsumerCoordinator consumerCoordinator = null;
+ if (groupId != null) {
+ GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
rebalanceTimeoutMs,
heartbeatIntervalMs,
groupId,
groupInstanceId,
retryBackoffMs,
true);
- ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(rebalanceConfig,
+ consumerCoordinator = new ConsumerCoordinator(rebalanceConfig,
loggerFactory,
consumerClient,
assignors,
@@ -2661,6 +2587,7 @@ public class KafkaConsumerTest {
autoCommitIntervalMs,
interceptors,
throwOnStableOffsetNotSupported);
+ }
Fetcher<String, String> fetcher = new Fetcher<>(
loggerFactory,
consumerClient,
@@ -2723,16 +2650,12 @@ public class KafkaConsumerTest {
@Test
public void testSubscriptionOnInvalidTopic() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Cluster cluster = metadata.fetch();
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
String invalidTopicName = "topic abc"; // Invalid topic name due to space
List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
@@ -2752,14 +2675,10 @@ public class KafkaConsumerTest {
@Test
public void testPollTimeMetrics() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic));
// MetricName objects to check
@@ -2801,14 +2720,10 @@ public class KafkaConsumerTest {
@Test
public void testPollIdleRatio() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
-
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
// MetricName object to check
Metrics metrics = consumer.metrics;
@@ -2851,8 +2766,6 @@ public class KafkaConsumerTest {
@Test
public void testClosingConsumerUnregistersConsumerMetrics() {
- Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -2879,10 +2792,8 @@ public class KafkaConsumerTest {
@Test
public void testEnforceRebalanceTriggersRebalanceOnNextPoll() {
Time time = new MockTime(1L);
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
MockRebalanceListener countingRebalanceListener = new MockRebalanceListener();
initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
@@ -2976,8 +2887,6 @@ public class KafkaConsumerTest {
}
private KafkaConsumer<String, String> consumerForCheckingTimeoutException() {
- final Time time = new MockTime();
- SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 96aaf8b..b073995 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -478,6 +478,21 @@ public abstract class ConsumerCoordinatorTest {
}
@Test
+ public void testCoordinatorNotAvailableWithUserAssignedType() {
+ subscriptions.assignFromUser(Collections.singleton(t1p));
+ // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE));
+ // set timeout to 0 because we don't want to retry after the error
+ coordinator.poll(time.timer(0));
+ assertTrue(coordinator.coordinatorUnknown());
+
+ // should find an available node in next find coordinator request
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertFalse(coordinator.coordinatorUnknown());
+ }
+
+ @Test
public void testCoordinatorNotAvailable() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0b215ff..c7e8cb9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1038,7 +1038,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
// in this case, we do an explicit seek, so there should be no need to query the coordinator at all
- val consumer = createConsumer()
+ // remove the group.id config to avoid coordinator created
+ val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
consumer.assign(List(tp).asJava)
consumer.seekToBeginning(List(tp).asJava)
consumeRecords(consumer)