You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/20 14:44:51 UTC

[GitHub] [kafka] lucasbru commented on a diff in pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

lucasbru commented on code in PR #13023:
URL: https://github.com/apache/kafka/pull/13023#discussion_r1053392985


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -4762,6 +4763,39 @@ public void testFetchDisconnectedShouldClearPreferredReadReplica() {
         assertEquals(-1, selected.id());
     }
 
+    @Test
+    public void testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, fetcher.sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Disconnect and remove tp0 from assignment
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0), true);
+        subscriptions.assignFromUser(emptySet());
+
+        // Preferred read replica should not be cleared
+        consumerClient.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -346,7 +346,11 @@ public void onFailure(RuntimeException e) {
                             FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                             if (handler != null) {
                                 handler.handleError(e);
-                                handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
+                                // Make sure to filter out topic partitions that are not part of the assignment
+                                // anymore when the request fails.
+                                handler.sessionTopicPartitions().stream()
+                                    .filter(subscriptions::isAssigned)
+                                    .forEach(subscriptions::clearPreferredReadReplica);

Review Comment:
   Good point! I was thinking the `Fetcher` object monitor protects us, but you are right, 
    it doesn't. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org