You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/12/21 08:31:38 UTC
[kafka] branch 3.3 updated: KAFKA-14532: Correctly handle failed fetch when partitions unassigned (#13023)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 7be2d9fb67b KAFKA-14532: Correctly handle failed fetch when partitions unassigned (#13023)
7be2d9fb67b is described below
commit 7be2d9fb67b0f416c7c25bca2f92b6e6360deba5
Author: Lucas Brutschy <lu...@users.noreply.github.com>
AuthorDate: Wed Dec 21 09:17:11 2022 +0100
KAFKA-14532: Correctly handle failed fetch when partitions unassigned (#13023)
The failure handling code for fetches could run into an IllegalStateException if a fetch response came back with a failure after the corresponding topic partition has already been removed from the assignment.
Reviewers: David Jacot <dj...@confluent.io>
---
.../consumer/internals/SubscriptionState.java | 7 ++++-
.../clients/consumer/internals/FetcherTest.java | 36 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 03b664e1392..b41122f0e50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -620,7 +620,12 @@ public class SubscriptionState {
* @return the removed preferred read replica if set, None otherwise.
*/
public synchronized Optional<Integer> clearPreferredReadReplica(TopicPartition tp) {
- return assignedState(tp).clearPreferredReadReplica();
+ final TopicPartitionState topicPartitionState = assignedStateOrNull(tp);
+ if (topicPartitionState == null) {
+ return Optional.empty();
+ } else {
+ return topicPartitionState.clearPreferredReadReplica();
+ }
}
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 931ff2070fb..3588df2505c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -137,6 +137,7 @@ import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@@ -4764,6 +4765,41 @@ public class FetcherTest {
assertEquals(-1, selected.id());
}
+ @Test
+ public void testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() {
+ buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(),
+ Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis());
+
+ subscriptions.assignFromUser(singleton(tp0));
+ client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+ subscriptions.seek(tp0, 0);
+ assertEquals(1, fetcher.sendFetches());
+
+ // Set preferred read replica to node=1
+ client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+ consumerClient.poll(time.timer(0));
+ assertTrue(fetcher.hasCompletedFetches());
+ fetchedRecords();
+
+ // Verify
+ Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+ assertEquals(1, selected.id());
+ assertEquals(1, fetcher.sendFetches());
+ assertFalse(fetcher.hasCompletedFetches());
+
+ // Disconnect and remove tp0 from assignment
+ client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0), true);
+ subscriptions.assignFromUser(emptySet());
+
+ // Preferred read replica should not be cleared
+ consumerClient.poll(time.timer(0));
+ assertFalse(fetcher.hasCompletedFetches());
+ fetchedRecords();
+ selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+ assertEquals(-1, selected.id());
+ }
+
@Test
public void testFetchErrorShouldClearPreferredReadReplica() {
buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(),