You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/19 21:24:33 UTC

[GitHub] [kafka] lucasbru opened a new pull request, #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

lucasbru opened a new pull request, #13023:
URL: https://github.com/apache/kafka/pull/13023

   The failure handling code for fetches of `KafkaConsumer`
   could run into an `IllegalStateException` if a fetch response 
   came back with a failure after the corresponding topic
   partition has already been removed from the assignment.
   
   A corresponding test case was added to the unit tests.
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on a diff in pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

Posted by GitBox <gi...@apache.org>.
lucasbru commented on code in PR #13023:
URL: https://github.com/apache/kafka/pull/13023#discussion_r1053392985


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -4762,6 +4763,39 @@ public void testFetchDisconnectedShouldClearPreferredReadReplica() {
         assertEquals(-1, selected.id());
     }
 
+    @Test
+    public void testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, fetcher.sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Disconnect and remove tp0 from assignment
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0), true);
+        subscriptions.assignFromUser(emptySet());
+
+        // Preferred read replica should not be cleared
+        consumerClient.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -346,7 +346,11 @@ public void onFailure(RuntimeException e) {
                             FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                             if (handler != null) {
                                 handler.handleError(e);
-                                handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
+                                // Make sure to filter out topic partitions that are not part of the assignment
+                                // anymore when the request fails.
+                                handler.sessionTopicPartitions().stream()
+                                    .filter(subscriptions::isAssigned)
+                                    .forEach(subscriptions::clearPreferredReadReplica);

Review Comment:
   Good point! I was thinking the `Fetcher` object monitor protects us, but you are right, 
    it doesn't. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #13023:
URL: https://github.com/apache/kafka/pull/13023#discussion_r1053276546


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -4762,6 +4763,39 @@ public void testFetchDisconnectedShouldClearPreferredReadReplica() {
         assertEquals(-1, selected.id());
     }
 
+    @Test
+    public void testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, fetcher.sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Disconnect and remove tp0 from assignment
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0), true);
+        subscriptions.assignFromUser(emptySet());
+
+        // Preferred read replica should not be cleared
+        consumerClient.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();

Review Comment:
   Should we add the following assertion to be consistent with other tests?
   ```
           selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
           assertEquals(-1, selected.id());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -346,7 +346,11 @@ public void onFailure(RuntimeException e) {
                             FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                             if (handler != null) {
                                 handler.handleError(e);
-                                handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
+                                // Make sure to filter out topic partitions that are not part of the assignment
+                                // anymore when the request fails.
+                                handler.sessionTopicPartitions().stream()
+                                    .filter(subscriptions::isAssigned)
+                                    .forEach(subscriptions::clearPreferredReadReplica);

Review Comment:
   What would happen if the partition is removed from the subscriptions between these two lines? It seems to me that we could face the same issue.
   
   It may be better to push this check into `clearPreferredReadReplica`. Something like this:
   
   ```
       public synchronized Optional<Integer> clearPreferredReadReplica(TopicPartition tp) {
           final TopicPartitionState topicPartitionState = assignedStateOrNull(tp);
           if (topicPartitionState == null) {
               return Optional.empty();
           } else {
               return topicPartitionState.clearPreferredReadReplica();
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #13023:
URL: https://github.com/apache/kafka/pull/13023#discussion_r1053900444


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -620,7 +620,12 @@ public synchronized Optional<Integer> preferredReadReplica(TopicPartition tp, lo
      * @return the removed preferred read replica if set, None otherwise.
      */
     public synchronized Optional<Integer> clearPreferredReadReplica(TopicPartition tp) {
-        return assignedState(tp).clearPreferredReadReplica();
+        final TopicPartitionState topicPartitionState = assignedStateOrNull(tp);
+        if (topicPartitionState == null) {
+            return Optional.empty();
+        } else {

Review Comment:
   The previous condition was returned, so you don't need the else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #13023:
URL: https://github.com/apache/kafka/pull/13023#discussion_r1054106713


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -620,7 +620,12 @@ public synchronized Optional<Integer> preferredReadReplica(TopicPartition tp, lo
      * @return the removed preferred read replica if set, None otherwise.
      */
     public synchronized Optional<Integer> clearPreferredReadReplica(TopicPartition tp) {
-        return assignedState(tp).clearPreferredReadReplica();
+        final TopicPartitionState topicPartitionState = assignedStateOrNull(tp);
+        if (topicPartitionState == null) {
+            return Optional.empty();
+        } else {

Review Comment:
   Sorry. I did not see your comment before merging. I agree that we could remove it but it is also ok with it. Other methods in this class are structured like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #13023:
URL: https://github.com/apache/kafka/pull/13023#issuecomment-1359310514

   The bug was introduced in https://github.com/apache/kafka/pull/12956.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #13023:
URL: https://github.com/apache/kafka/pull/13023#issuecomment-1358415058

   I can take a look at it tomorrow. We should mark it as blocker for 3.4. cc @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

Posted by GitBox <gi...@apache.org>.
dajac merged PR #13023:
URL: https://github.com/apache/kafka/pull/13023


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #13023:
URL: https://github.com/apache/kafka/pull/13023#issuecomment-1361005539

   Merged to trunk, 3.4 and 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on pull request #13023: KAFKA-14532: Correctly handle failed fetch when partitions unassigned

Posted by GitBox <gi...@apache.org>.
lucasbru commented on PR #13023:
URL: https://github.com/apache/kafka/pull/13023#issuecomment-1358395338

   @artemlivshits could you have a look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org