You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/09/13 04:21:57 UTC

[kafka] branch 3.2 updated: KAFKA-14196; Do not continue fetching partitions awaiting auto-commit prior to revocation (#12603)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 56baf6448f0 KAFKA-14196; Do not continue fetching partitions awaiting auto-commit prior to revocation (#12603)
56baf6448f0 is described below

commit 56baf6448f0540d30e82c858e8e242094e95df7f
Author: Philip Nee <pn...@confluent.io>
AuthorDate: Mon Sep 12 21:02:13 2022 -0700

    KAFKA-14196; Do not continue fetching partitions awaiting auto-commit prior to revocation (#12603)
    
    When auto-commit is enabled with the "eager" rebalance strategy, the consumer will commit all offsets prior to revocation. Following recent changes, this offset commit is done asynchronously, which means there is an opportunity for fetches to continue returning data to the application. When this happens, the progress is lost following revocation, which results in duplicate consumption. This patch fixes the problem by adding a flag in `SubscriptionState` to ensure that partitions which [...]
    
    Reviewers: Luke Chen <sh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    | 17 ++++++
 .../consumer/internals/SubscriptionState.java      | 12 +++-
 .../clients/consumer/internals/FetcherTest.java    | 70 ++++++++++++++++++++++
 .../consumer/internals/SubscriptionStateTest.java  | 10 ++++
 4 files changed, 108 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9838e7dc8fe..a614504704f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -759,6 +759,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // async commit offsets prior to rebalance if auto-commit enabled
         // and there is no in-flight offset commit request
         if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            maybeMarkPartitionsPendingRevocation();
             autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
@@ -859,6 +860,22 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         return true;
     }
 
+    private void maybeMarkPartitionsPendingRevocation() {
+        if (protocol != RebalanceProtocol.EAGER) {
+            return;
+        }
+
+        // When asynchronously committing offsets prior to the revocation of a set of partitions, there will be a
+        // window of time between when the offset commit is sent and when it returns and revocation completes. It is
+        // possible for pending fetches for these partitions to return during this time, which means the application's
+        // position may get ahead of the committed position prior to revocation. This can cause duplicate consumption.
+        // To prevent this, we mark the partitions as "pending revocation," which stops the Fetcher from sending new
+        // fetches or returning data from previous fetches to the user.
+        Set<TopicPartition> partitions = subscriptions.assignedPartitions();
+        log.debug("Marking assigned partitions pending for revocation: {}", partitions);
+        subscriptions.markPendingRevocation(partitions);
+    }
+
     @Override
     public void onLeavePrepare() {
         // Save the current Generation, as the hb thread can change it at any time
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 416468d945f..7f8fe8479e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -738,6 +738,10 @@ public class SubscriptionState {
         assignedState(tp).pause();
     }
 
+    public synchronized void markPendingRevocation(Set<TopicPartition> tps) {
+        tps.forEach(tp -> assignedState(tp).markPendingRevocation());
+    }
+
     public synchronized void resume(TopicPartition tp) {
         assignedState(tp).resume();
     }
@@ -769,6 +773,7 @@ public class SubscriptionState {
         private Long logStartOffset; // the log start offset
         private Long lastStableOffset;
         private boolean paused;  // whether this partition has been paused by the user
+        private boolean pendingRevocation;
         private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
         private Long nextRetryTimeMs;
         private Integer preferredReadReplica;
@@ -777,6 +782,7 @@ public class SubscriptionState {
         
         TopicPartitionState() {
             this.paused = false;
+            this.pendingRevocation = false;
             this.endOffsetRequested = false;
             this.fetchState = FetchStates.INITIALIZING;
             this.position = null;
@@ -966,12 +972,16 @@ public class SubscriptionState {
             this.paused = true;
         }
 
+        private void markPendingRevocation() {
+            this.pendingRevocation = true;
+        }
+
         private void resume() {
             this.paused = false;
         }
 
         private boolean isFetchable() {
-            return !paused && hasValidPosition();
+            return !paused && !pendingRevocation && hasValidPosition();
         }
 
         private void highWatermark(Long highWatermark) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 545bf24b9ec..1ad0ee060a9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -272,6 +272,45 @@ public class FetcherTest {
         }
     }
 
+    @Test
+    public void testInflightFetchOnPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, fetcher.sendFetches());
+        subscriptions.markPendingRevocation(singleton(tp0));
+
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+        assertNull(fetchedRecords().get(tp0));
+    }
+
+    @Test
+    public void testFetchingPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset); // this is the next fetching position
+
+        // mark partition unfetchable
+        subscriptions.markPendingRevocation(singleton(tp0));
+        assertEquals(0, fetcher.sendFetches());
+        consumerClient.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset);
+    }
+
     @Test
     public void testFetchWithNoTopicId() {
         // Should work and default to using old request type.
@@ -2283,6 +2322,37 @@ public class FetcherTest {
         assertEquals(5, subscriptions.position(tp0).offset);
     }
 
+    @Test
+    public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);
+        assertEquals(100, subscriptions.position(tp0).offset);
+
+        assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused
+
+        subscriptions.markPendingRevocation(singleton(tp0));
+        fetcher.resetOffsetsIfNeeded();
+
+        // once a partition is marked pending, it should not be fetchable
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertFalse(subscriptions.isFetchable(tp0));
+        assertTrue(subscriptions.hasValidPosition(tp0));
+        assertEquals(100, subscriptions.position(tp0).offset);
+
+        subscriptions.seek(tp0, 100);
+        assertEquals(100, subscriptions.position(tp0).offset);
+
+        // reassignment should enable fetching of the same partition
+        subscriptions.unsubscribe();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 100);
+        assertEquals(100, subscriptions.position(tp0).offset);
+        assertTrue(subscriptions.isFetchable(tp0));
+    }
+
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
         buildFetcher();
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index d19234fe8a6..97c61616c75 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -256,6 +256,16 @@ public class SubscriptionStateTest {
         assertTrue(state.isFetchable(tp0));
     }
 
+    @Test
+    public void testMarkingPartitionPending() {
+        state.assignFromUser(singleton(tp0));
+        state.seek(tp0, 100);
+        assertTrue(state.isFetchable(tp0));
+        state.markPendingRevocation(singleton(tp0));
+        assertFalse(state.isFetchable(tp0));
+        assertFalse(state.isPaused(tp0));
+    }
+
     @Test
     public void invalidPositionUpdate() {
         state.subscribe(singleton(topic), rebalanceListener);