You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/12/21 08:31:38 UTC

[kafka] branch 3.3 updated: KAFKA-14532: Correctly handle failed fetch when partitions unassigned (#13023)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 7be2d9fb67b KAFKA-14532: Correctly handle failed fetch when partitions unassigned (#13023)
7be2d9fb67b is described below

commit 7be2d9fb67b0f416c7c25bca2f92b6e6360deba5
Author: Lucas Brutschy <lu...@users.noreply.github.com>
AuthorDate: Wed Dec 21 09:17:11 2022 +0100

    KAFKA-14532: Correctly handle failed fetch when partitions unassigned (#13023)
    
    The failure handling code for fetches could run into an IllegalStateException if a fetch response came back with a failure after the corresponding topic partition has already been removed from the assignment.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../consumer/internals/SubscriptionState.java      |  7 ++++-
 .../clients/consumer/internals/FetcherTest.java    | 36 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 03b664e1392..b41122f0e50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -620,7 +620,12 @@ public class SubscriptionState {
      * @return the removed preferred read replica if set, None otherwise.
      */
     public synchronized Optional<Integer> clearPreferredReadReplica(TopicPartition tp) {
-        return assignedState(tp).clearPreferredReadReplica();
+        final TopicPartitionState topicPartitionState = assignedStateOrNull(tp);
+        if (topicPartitionState == null) {
+            return Optional.empty();
+        } else {
+            return topicPartitionState.clearPreferredReadReplica();
+        }
     }
 
     public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 931ff2070fb..3588df2505c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -137,6 +137,7 @@ import java.util.stream.Collectors;
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
@@ -4764,6 +4765,41 @@ public class FetcherTest {
         assertEquals(-1, selected.id());
     }
 
+    @Test
+    public void testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, fetcher.sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Disconnect and remove tp0 from assignment
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0), true);
+        subscriptions.assignFromUser(emptySet());
+
+        // Preferred read replica should not be cleared
+        consumerClient.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(-1, selected.id());
+    }
+
     @Test
     public void testFetchErrorShouldClearPreferredReadReplica() {
         buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(),