You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/07 21:57:58 UTC

[GitHub] [kafka] philipnee opened a new pull request, #12603: Pausing partition to prevent duplication

philipnee opened a new pull request, #12603:
URL: https://github.com/apache/kafka/pull/12603

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r967027203


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);
+
         // async commit offsets prior to rebalance if auto-commit enabled
         // and there is no in-flight offset commit request
-        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        if (autoCommitEnabled) {
+            pausePartitions(partitionsToRevoke);

Review Comment:
   I think what Jason suggested, is change from "committing offset subscribed partitions -> revoking", to "revoking -> "committing offset subscribed partitions(including revoked partitions)". I don't think it is a good idea because that would open a door to allow consumer committing offsets for partitions it doesn't own. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968897177


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            // Delay the partition revocation because we don't revoke the already owned partitions
+            return partitions;
+        }
+
+        log.warn("Invalid protocol: {}. No partition will be revoked.", protocol);

Review Comment:
   I think this code is dead? Why don't we use a `switch` like we had before? Then the compiler can help us ensure we handle new cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r967027203


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);
+
         // async commit offsets prior to rebalance if auto-commit enabled
         // and there is no in-flight offset commit request
-        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        if (autoCommitEnabled) {
+            pausePartitions(partitionsToRevoke);

Review Comment:
   I think what Jason suggested, is change from 
   "committing offset subscribed partitions -> revoking", 
   to "revoking -> committing offset subscribed partitions(including revoked partitions)". 
   
   I don't think it is a good idea because that would open a door to allow consumer committing offsets for partitions it doesn't own. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968965173


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        switch (protocol) {
+            case EAGER:
+                partitions.addAll(subscriptions.assignedPartitions());
+                break;
+
+            case COOPERATIVE:
+                // Delay the partition revocation because we don't revoke the already owned partitions

Review Comment:
   I was looking into the cooperative code path. We revoke the partitions in `onJoinComplete`, so that made me wonder why we don't have the same issue there. In fact, there is no additional offset commit in the current logic, which makes me think that the cooperative logic would already be more prone to duplicate consumption. We don't need to fix this here since it seems to be a pre-existing issue, but I am wondering if the failing system tests also cover cooperative assignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r967027203


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);
+
         // async commit offsets prior to rebalance if auto-commit enabled
         // and there is no in-flight offset commit request
-        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        if (autoCommitEnabled) {
+            pausePartitions(partitionsToRevoke);

Review Comment:
   I think what Jason suggested, is change from 
   "committing offset for subscribed partitions -> revoking", 
   to "revoking -> committing offset for subscribed partitions **+ revoked partitions**". 
   
   I don't think it is a good idea because that would open a door to allow consumer committing offsets for partitions it doesn't own. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969137654


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
         assertEquals(5, subscriptions.position(tp0).offset);
     }
 
+    @Test
+    public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);
+        assertEquals(100, subscriptions.position(tp0).offset);
+
+        assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused

Review Comment:
   oops, this is a typo.  Probably some residual after modifying the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969115062


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
         assertEquals(5, subscriptions.position(tp0).offset);
     }
 
+    @Test
+    public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);

Review Comment:
   Any reason we seek tp0 to offset 100 three times?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -769,6 +773,7 @@ private static class TopicPartitionState {
         private Long logStartOffset; // the log start offset
         private Long lastStableOffset;
         private boolean paused;  // whether this partition has been paused by the user
+        private boolean consumable;

Review Comment:
   I like the name: `pendingRevocation`, too.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
         assertEquals(5, subscriptions.position(tp0).offset);
     }
 
+    @Test
+    public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);
+        assertEquals(100, subscriptions.position(tp0).offset);
+
+        assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused

Review Comment:
   I don't understand the comment here. Where do we pause tp0? and it is fetchable now, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        switch (protocol) {
+            case EAGER:
+                partitions.addAll(subscriptions.assignedPartitions());
+                break;
+
+            case COOPERATIVE:
+                // Delay the partition revocation because we don't revoke the already owned partitions

Review Comment:
   Agree that we handle the cooperative issue separately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969020626


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -935,6 +941,9 @@ private boolean isPaused() {
             return paused;
         }
 
+        private boolean isPendingRevocation() {

Review Comment:
   nit: do we need this? Is it used anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: Pausing partition to prevent duplication

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966254369


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   Do we need to recompute the partitions right before the revocation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968910809


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   i think it makes sense.  We probably can compute the partition during the revocation due to the nature of cooperative protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968922937


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            // Delay the partition revocation because we don't revoke the already owned partitions
+            return partitions;
+        }
+
+        log.warn("Invalid protocol: {}. No partition will be revoked.", protocol);
+        return partitions;
+    }
+
+    private Optional<Exception> revokePartitions(SortedSet<TopicPartition> partitions, int generation, String memberId) {
+
         // the generation / member-id can possibly be reset by the heartbeat thread
         // upon getting errors or heartbeat timeouts; in this case whatever is previously
         // owned partitions would be lost, we should trigger the callback and cleanup the assignment;
         // otherwise we can proceed normally and revoke the partitions depending on the protocol,
         // and in that case we should only change the assignment AFTER the revoke callback is triggered
         // so that users can still access the previously owned partitions to commit offsets etc.
-        Exception exception = null;
-        final SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
+        Optional<Exception> exception = Optional.empty();
         if (generation == Generation.NO_GENERATION.generationId ||
-            memberId.equals(Generation.NO_GENERATION.memberId)) {
-            revokedPartitions.addAll(subscriptions.assignedPartitions());
-
-            if (!revokedPartitions.isEmpty()) {
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            if (!partitions.isEmpty()) {
                 log.info("Giving away all assigned partitions as lost since generation/memberID has been reset," +
-                    "indicating that consumer is in old state or no longer part of the group");
-                exception = invokePartitionsLost(revokedPartitions);
-
+                        "indicating that consumer is in old state or no longer part of the group");
+                exception = Optional.ofNullable(invokePartitionsLost(partitions));
                 subscriptions.assignFromSubscribed(Collections.emptySet());
             }
         } else {
             switch (protocol) {
                 case EAGER:
-                    // revoke all partitions
-                    revokedPartitions.addAll(subscriptions.assignedPartitions());
-                    exception = invokePartitionsRevoked(revokedPartitions);
-
+                    exception = Optional.ofNullable(invokePartitionsRevoked(partitions));
                     subscriptions.assignFromSubscribed(Collections.emptySet());
 
                     break;
 
                 case COOPERATIVE:
-                    // only revoke those partitions that are not in the subscription any more.
                     Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
-                    revokedPartitions.addAll(ownedPartitions.stream()
-                        .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
-                        .collect(Collectors.toSet()));
-
-                    if (!revokedPartitions.isEmpty()) {
-                        exception = invokePartitionsRevoked(revokedPartitions);
+                    partitions.addAll(ownedPartitions.stream()
+                            .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
+                            .collect(Collectors.toSet()));
 
-                        ownedPartitions.removeAll(revokedPartitions);
+                    if (!partitions.isEmpty()) {
+                        exception = Optional.ofNullable(invokePartitionsRevoked(partitions));
+                        ownedPartitions.removeAll(partitions);
                         subscriptions.assignFromSubscribed(ownedPartitions);
                     }
-
                     break;
             }
         }
 
-        isLeader = false;
-        subscriptions.resetGroupSubscription();
-        joinPrepareTimer = null;
-        autoCommitOffsetRequestFuture = null;
-        timer.update();
+        return exception;
+    }
 
-        if (exception != null) {
-            throw new KafkaException("User rebalance callback throws an error", exception);
-        }
-        return true;
+    private void markPartitionsUnconsumable(final Set<TopicPartition> partitions) {

Review Comment:
   nit: update name



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());

Review Comment:
   Hmm, these are lost partitions, not ones that we will revoke. We don't need to pause them since any offset commits will get rejected anyway. I feel like we could handle this case in `revokePartitions` as in the old logic. Then we just have the "eager" partitions that are considered here and we could rename this to `eagerPartitionsToRevoke` or something like that.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            // Delay the partition revocation because we don't revoke the already owned partitions
+            return partitions;
+        }
+
+        log.warn("Invalid protocol: {}. No partition will be revoked.", protocol);
+        return partitions;
+    }
+
+    private Optional<Exception> revokePartitions(SortedSet<TopicPartition> partitions, int generation, String memberId) {
+
         // the generation / member-id can possibly be reset by the heartbeat thread
         // upon getting errors or heartbeat timeouts; in this case whatever is previously
         // owned partitions would be lost, we should trigger the callback and cleanup the assignment;
         // otherwise we can proceed normally and revoke the partitions depending on the protocol,
         // and in that case we should only change the assignment AFTER the revoke callback is triggered
         // so that users can still access the previously owned partitions to commit offsets etc.
-        Exception exception = null;
-        final SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
+        Optional<Exception> exception = Optional.empty();
         if (generation == Generation.NO_GENERATION.generationId ||
-            memberId.equals(Generation.NO_GENERATION.memberId)) {
-            revokedPartitions.addAll(subscriptions.assignedPartitions());
-
-            if (!revokedPartitions.isEmpty()) {
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            if (!partitions.isEmpty()) {
                 log.info("Giving away all assigned partitions as lost since generation/memberID has been reset," +
-                    "indicating that consumer is in old state or no longer part of the group");
-                exception = invokePartitionsLost(revokedPartitions);
-
+                        "indicating that consumer is in old state or no longer part of the group");
+                exception = Optional.ofNullable(invokePartitionsLost(partitions));
                 subscriptions.assignFromSubscribed(Collections.emptySet());
             }
         } else {
             switch (protocol) {
                 case EAGER:
-                    // revoke all partitions
-                    revokedPartitions.addAll(subscriptions.assignedPartitions());
-                    exception = invokePartitionsRevoked(revokedPartitions);
-
+                    exception = Optional.ofNullable(invokePartitionsRevoked(partitions));
                     subscriptions.assignFromSubscribed(Collections.emptySet());
 
                     break;
 
                 case COOPERATIVE:
-                    // only revoke those partitions that are not in the subscription any more.
                     Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
-                    revokedPartitions.addAll(ownedPartitions.stream()
-                        .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
-                        .collect(Collectors.toSet()));
-
-                    if (!revokedPartitions.isEmpty()) {
-                        exception = invokePartitionsRevoked(revokedPartitions);
+                    partitions.addAll(ownedPartitions.stream()

Review Comment:
   Perhaps we could put this in a helper called `revokeUnsubscribedPartitions` and add a comment:
   > For the cooperative strategy, partitions are usually revoked in `onJoinComplete` when the coordinator explicitly requests revocation in order to assign them to another group member. However, any partitions of topics which are no long subscribed can be revoked immediately before rejoining the group.
   
   If we do that, perhaps there's no need to mutate `partitions`. We're already building a new collection here, so we can just make it a new SortedSet.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        switch (protocol) {
+            case EAGER:
+                partitions.addAll(subscriptions.assignedPartitions());
+                break;
+
+            case COOPERATIVE:
+                // Delay the partition revocation because we don't revoke the already owned partitions

Review Comment:
   I was looking into the cooperative code path. We revoke the partition in `onJoinComplete`, so that made me wonder why we don't have the same issue there. In fact, there is no additional offset commit in the current logic, which makes me thing that the cooperative logic would already be more prone to duplicate consumption. We don't need to fix this here since it seems to be a pre-existing issue, but I am wondering if the failing system tests also cover cooperative assignment?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            // Delay the partition revocation because we don't revoke the already owned partitions
+            return partitions;
+        }
+
+        log.warn("Invalid protocol: {}. No partition will be revoked.", protocol);
+        return partitions;
+    }
+
+    private Optional<Exception> revokePartitions(SortedSet<TopicPartition> partitions, int generation, String memberId) {
+
         // the generation / member-id can possibly be reset by the heartbeat thread
         // upon getting errors or heartbeat timeouts; in this case whatever is previously
         // owned partitions would be lost, we should trigger the callback and cleanup the assignment;
         // otherwise we can proceed normally and revoke the partitions depending on the protocol,
         // and in that case we should only change the assignment AFTER the revoke callback is triggered
         // so that users can still access the previously owned partitions to commit offsets etc.
-        Exception exception = null;
-        final SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
+        Optional<Exception> exception = Optional.empty();
         if (generation == Generation.NO_GENERATION.generationId ||
-            memberId.equals(Generation.NO_GENERATION.memberId)) {
-            revokedPartitions.addAll(subscriptions.assignedPartitions());
-
-            if (!revokedPartitions.isEmpty()) {
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            if (!partitions.isEmpty()) {
                 log.info("Giving away all assigned partitions as lost since generation/memberID has been reset," +
-                    "indicating that consumer is in old state or no longer part of the group");
-                exception = invokePartitionsLost(revokedPartitions);
-
+                        "indicating that consumer is in old state or no longer part of the group");
+                exception = Optional.ofNullable(invokePartitionsLost(partitions));
                 subscriptions.assignFromSubscribed(Collections.emptySet());
             }
         } else {
             switch (protocol) {
                 case EAGER:
-                    // revoke all partitions
-                    revokedPartitions.addAll(subscriptions.assignedPartitions());
-                    exception = invokePartitionsRevoked(revokedPartitions);
-
+                    exception = Optional.ofNullable(invokePartitionsRevoked(partitions));
                     subscriptions.assignFromSubscribed(Collections.emptySet());
 
                     break;
 
                 case COOPERATIVE:
-                    // only revoke those partitions that are not in the subscription any more.
                     Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
-                    revokedPartitions.addAll(ownedPartitions.stream()
-                        .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
-                        .collect(Collectors.toSet()));
-
-                    if (!revokedPartitions.isEmpty()) {
-                        exception = invokePartitionsRevoked(revokedPartitions);
+                    partitions.addAll(ownedPartitions.stream()
+                            .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
+                            .collect(Collectors.toSet()));
 
-                        ownedPartitions.removeAll(revokedPartitions);
+                    if (!partitions.isEmpty()) {
+                        exception = Optional.ofNullable(invokePartitionsRevoked(partitions));
+                        ownedPartitions.removeAll(partitions);
                         subscriptions.assignFromSubscribed(ownedPartitions);
                     }
-
                     break;
             }
         }
 
-        isLeader = false;
-        subscriptions.resetGroupSubscription();
-        joinPrepareTimer = null;
-        autoCommitOffsetRequestFuture = null;
-        timer.update();
+        return exception;
+    }
 
-        if (exception != null) {
-            throw new KafkaException("User rebalance callback throws an error", exception);
-        }
-        return true;
+    private void markPartitionsUnconsumable(final Set<TopicPartition> partitions) {
+        // KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated

Review Comment:
   nit: We can leave off the jira reference if we provide enough documentation to understand the issue. I would say something like this:
   > When asynchronously committing offsets prior to the revocation of a set of partitions, there will be a window of time between when the offset commit is sent and when it returns and revocation completes. It is possible for pending fetches for these partitions to return during this time, which means the application's position may get ahead of the committed position prior to revocation. This can cause duplicate consumption. To prevent this, we mark the partitions as "pending revocation," which stops the `Fetcher` from sending new fetches or returning data from previous fetches to the user. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968822443


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            // Delay the partition revocation because we don't revoke the already owned partitions

Review Comment:
   leaving this purely for documentation purpose.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969021352


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -853,12 +850,38 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         autoCommitOffsetRequestFuture = null;
         timer.update();
 
-        if (exception != null) {
-            throw new KafkaException("User rebalance callback throws an error", exception);
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
         }
         return true;
     }
 
+    private SortedSet<TopicPartition> eagerPartitionsToRevoke(RebalanceProtocol protocol) {

Review Comment:
   nit: not using this anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969035562


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -2283,6 +2307,37 @@ public void testRestOffsetsAuthorizationFailure() {
         assertEquals(5, subscriptions.position(tp0).offset);
     }
 
+    @Test
+    public void testPendingRevacationPartitionFetching() {

Review Comment:
   nit: Revocation is misspelled
   
   I did not find the name very clear. It looks like the main difference between this and `testFetchingPendingPartitions` is that this method tests that the pending state gets reset after reassignment? Perhaps the name should reflect that?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -272,6 +272,30 @@ public void testFetchNormal() {
         }
     }
 
+    @Test
+    public void testFetchingPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset); // this is the next fetching position
+
+        // mark partition unfetchable
+        subscriptions.markPendingRevocation(singleton(tp0));

Review Comment:
   Another scenario is that we already have the fetch inflight when we mark pending revocation. Can we test that as well?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java:
##########
@@ -256,6 +256,15 @@ public void partitionPause() {
         assertTrue(state.isFetchable(tp0));
     }
 
+    @Test
+    public void testMarkingPartitionPending() {
+        state.assignFromUser(singleton(tp0));
+        state.seek(tp0, 100);
+        assertTrue(state.isFetchable(tp0));
+        state.markPendingRevocation(singleton(tp0));
+        assertFalse(state.isFetchable(tp0));

Review Comment:
   Perhaps we can also assert `isPaused` is false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969136963


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
         assertEquals(5, subscriptions.position(tp0).offset);
     }
 
+    @Test
+    public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);
+        subscriptions.seek(tp0, 100);
+        assertEquals(100, subscriptions.position(tp0).offset);
+
+        assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused

Review Comment:
   I'll clean it up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968970980


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        switch (protocol) {
+            case EAGER:
+                partitions.addAll(subscriptions.assignedPartitions());
+                break;
+
+            case COOPERATIVE:
+                // Delay the partition revocation because we don't revoke the already owned partitions

Review Comment:
   I don't think we have system test covering the cooperative strategy, so it is quite possible that the duplication here.  I think it would be good to update these tests with cooperative strategy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12603:
URL: https://github.com/apache/kafka/pull/12603#issuecomment-1243677992

   @philipnee Thanks for the PR. Could you please update the description to explain the change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966583573


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);
+
         // async commit offsets prior to rebalance if auto-commit enabled
         // and there is no in-flight offset commit request
-        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        if (autoCommitEnabled) {
+            pausePartitions(partitionsToRevoke);

Review Comment:
   but if we don't commit these offsets, i wonder if that breaks the poll contract, i.e. we need to commit the acked data from the previous poll.  Note that, I believe some of the data could already be returned to the client from the previous poll loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966582411


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+            partitions.addAll(ownedPartitions.stream()
+                    .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
+                    .collect(Collectors.toSet()));
+            return partitions;
+        }
+
+        log.debug("Invalid protocol: {}. No partition will be revoked.", protocol);
+        return partitions;
+    }
+
+    private void pausePartitions(Set<TopicPartition> partitions) {
+        // KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated
+        //  data returned by the consumer poll loop.  Without pausing the partitions, the consumer will move forward
+        //  returning the data w/o committing them.  And the progress will be lost once the partition is revoked.
+        //  This only applies to autocommits, as we expect user to handle the offsets menually during the partition
+        //  revocation.
+
+        log.debug("Pausing partitions {} before onJoinPrepare", partitions);
+        partitions.forEach(tp -> subscriptionState().pause(tp));

Review Comment:
   Good call there, I think @guozhangwang originally propose marking these partitions using a different flag, to achieve what pause does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966581908


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   Right, because if the consumer is assigned to a new partition, we will need to revoke them for eager protocol. (one case i can think of).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966512403


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+            partitions.addAll(ownedPartitions.stream()
+                    .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
+                    .collect(Collectors.toSet()));
+            return partitions;
+        }
+
+        log.debug("Invalid protocol: {}. No partition will be revoked.", protocol);
+        return partitions;
+    }
+
+    private void pausePartitions(Set<TopicPartition> partitions) {
+        // KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated
+        //  data returned by the consumer poll loop.  Without pausing the partitions, the consumer will move forward
+        //  returning the data w/o committing them.  And the progress will be lost once the partition is revoked.
+        //  This only applies to autocommits, as we expect user to handle the offsets menually during the partition
+        //  revocation.
+
+        log.debug("Pausing partitions {} before onJoinPrepare", partitions);
+        partitions.forEach(tp -> subscriptionState().pause(tp));

Review Comment:
   I have mixed feelings about reusing the pause mechanism here. On the one hand, it does what we want. On the other hand, the pause state can be mutated by the user. What if the user calls `resume()` on a partition that we paused internally? Sounds crazy perhaps, but I think I'd rather have a mechanism that can only be accessed internally for stuff like this.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);
+
         // async commit offsets prior to rebalance if auto-commit enabled
         // and there is no in-flight offset commit request
-        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        if (autoCommitEnabled) {
+            pausePartitions(partitionsToRevoke);

Review Comment:
   It seems like this bug (and some of the complexity in this patch) is due to the fact that we do the auto-commit prior to revoking partitions. I wonder if that is really necessary. If we revoke first, then the partitions would be removed from `SubscriptionState` and we wouldn't have to worry about fetches for these partitions returning. Could that work as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   Is your concern that the subscription could change in between the time that we  pause the partitions and the time that the revocation callback is triggered?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966583573


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);
+
         // async commit offsets prior to rebalance if auto-commit enabled
         // and there is no in-flight offset commit request
-        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        if (autoCommitEnabled) {
+            pausePartitions(partitionsToRevoke);

Review Comment:
   but if we don't commit these offsets, i wonder if that breaks the poll contract, i.e. we need to commit the acked data.  Note that, I believe some of the data could already be returned to the client from the previous poll loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969021668


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -853,12 +850,38 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         autoCommitOffsetRequestFuture = null;
         timer.update();
 
-        if (exception != null) {
-            throw new KafkaException("User rebalance callback throws an error", exception);
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
         }
         return true;
     }
 
+    private SortedSet<TopicPartition> eagerPartitionsToRevoke(RebalanceProtocol protocol) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (protocol != RebalanceProtocol.EAGER) {
+            return partitions;
+        }
+
+        partitions.addAll(subscriptions.assignedPartitions());
+        return partitions;
+    }
+
+    private void markPendingPartitions() {

Review Comment:
   How about `maybeMarkPartitionsPendingRevocation`? Otherwise it's a little unclear what exactly is pending.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
philipnee commented on PR #12603:
URL: https://github.com/apache/kafka/pull/12603#issuecomment-1243094204

   Thanks @showuon @hachikuji for the inputs, I just made this PR reviewable, so please review it if possible :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969124759


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        switch (protocol) {
+            case EAGER:
+                partitions.addAll(subscriptions.assignedPartitions());
+                break;
+
+            case COOPERATIVE:
+                // Delay the partition revocation because we don't revoke the already owned partitions

Review Comment:
   I filed this issue: https://issues.apache.org/jira/browse/KAFKA-14224.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji merged PR #12603:
URL: https://github.com/apache/kafka/pull/12603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968965173


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        switch (protocol) {
+            case EAGER:
+                partitions.addAll(subscriptions.assignedPartitions());
+                break;
+
+            case COOPERATIVE:
+                // Delay the partition revocation because we don't revoke the already owned partitions

Review Comment:
   I was looking into the cooperative code path. We revoke the partition in `onJoinComplete`, so that made me wonder why we don't have the same issue there. In fact, there is no additional offset commit in the current logic, which makes me think that the cooperative logic would already be more prone to duplicate consumption. We don't need to fix this here since it seems to be a pre-existing issue, but I am wondering if the failing system tests also cover cooperative assignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968754388


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -769,6 +773,7 @@ private static class TopicPartitionState {
         private Long logStartOffset; // the log start offset
         private Long lastStableOffset;
         private boolean paused;  // whether this partition has been paused by the user
+        private boolean consumable;

Review Comment:
   How about calling this `pendingRevocation` or something like that? That might make the usage clearer.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,94 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+            partitions.addAll(ownedPartitions.stream()
+                    .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
+                    .collect(Collectors.toSet()));
+            return partitions;
+        }
+
+        log.debug("Invalid protocol: {}. No partition will be revoked.", protocol);
+        return partitions;
+    }
+
+    private Optional<Exception> revokePartitions(SortedSet<TopicPartition> partitions, int generation, String memberId) {
+
         // the generation / member-id can possibly be reset by the heartbeat thread
         // upon getting errors or heartbeat timeouts; in this case whatever is previously
         // owned partitions would be lost, we should trigger the callback and cleanup the assignment;
         // otherwise we can proceed normally and revoke the partitions depending on the protocol,
         // and in that case we should only change the assignment AFTER the revoke callback is triggered
         // so that users can still access the previously owned partitions to commit offsets etc.
-        Exception exception = null;
-        final SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
+        Optional<Exception> exception = Optional.empty();
         if (generation == Generation.NO_GENERATION.generationId ||
-            memberId.equals(Generation.NO_GENERATION.memberId)) {
-            revokedPartitions.addAll(subscriptions.assignedPartitions());
-
-            if (!revokedPartitions.isEmpty()) {
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            if (!partitions.isEmpty()) {
                 log.info("Giving away all assigned partitions as lost since generation/memberID has been reset," +
-                    "indicating that consumer is in old state or no longer part of the group");
-                exception = invokePartitionsLost(revokedPartitions);
-
+                        "indicating that consumer is in old state or no longer part of the group");
+                exception = Optional.ofNullable(invokePartitionsLost(partitions));
                 subscriptions.assignFromSubscribed(Collections.emptySet());
             }
         } else {
             switch (protocol) {
                 case EAGER:
-                    // revoke all partitions
-                    revokedPartitions.addAll(subscriptions.assignedPartitions());
-                    exception = invokePartitionsRevoked(revokedPartitions);
-
+                    exception = Optional.ofNullable(invokePartitionsRevoked(partitions));
                     subscriptions.assignFromSubscribed(Collections.emptySet());
 
                     break;
 
                 case COOPERATIVE:
-                    // only revoke those partitions that are not in the subscription any more.
                     Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
-                    revokedPartitions.addAll(ownedPartitions.stream()
-                        .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
-                        .collect(Collectors.toSet()));
-
-                    if (!revokedPartitions.isEmpty()) {
-                        exception = invokePartitionsRevoked(revokedPartitions);
-
-                        ownedPartitions.removeAll(revokedPartitions);
+                    if (!partitions.isEmpty()) {
+                        exception = Optional.ofNullable(invokePartitionsRevoked(partitions));
+                        ownedPartitions.removeAll(partitions);
                         subscriptions.assignFromSubscribed(ownedPartitions);
                     }
-
                     break;
             }
         }
 
-        isLeader = false;
-        subscriptions.resetGroupSubscription();
-        joinPrepareTimer = null;
-        autoCommitOffsetRequestFuture = null;
-        timer.update();
+        return exception;
+    }
 
-        if (exception != null) {
-            throw new KafkaException("User rebalance callback throws an error", exception);
-        }
-        return true;
+    private void markPartitionsUnconsumable(final Set<TopicPartition> partitions) {
+        // KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated
+        //  data returned by the consumer poll loop.  Without pausing the partitions, the consumer will move forward
+        //  returning the data w/o committing them.  And the progress will be lost once the partition is revoked.
+        //  This only applies to autocommits, as we expect user to handle the offsets menually during the partition
+        //  revocation.
+        log.debug("Marking assigned partitions unconsumable: {}", partitions);
+        partitions.forEach(subscriptions::markUnconsumable);

Review Comment:
   nit: how about we make `SubscriptionState.markUnconsumable` batched? Then we just need to grab the lock once.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   I am not sure how we could be assigned new partitions before completing revocation. Is that possible? 
   
   Another case from `getPartitionsToRevoke` below is when the subscription changes in the cooperative protocol. We use the current subscription to filter the partitions that can be revoked immediately before rejoining the group. But for partitions which are no longer subscribed, there is no need to send an offset commit and there is no danger of fetches returning. So I'm wondering if that case can be handled separately. In other words, can we delay computation of this set for the cooperative protocol until we are ready to call `onPartitionsRevoked`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969009257


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,12 +756,11 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
-        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);
-
+        final SortedSet<TopicPartition> eagerPartitionsToRevoke = eagerPartitionsToRevoke(protocol);

Review Comment:
   Hmm, thinking about this a little more since we're down to just the eager protocol. Since the assignment won't change until the rebalance completes, maybe we do not need to precompute it. In other words, maybe we can restore the original logic in `revokePartitions` and we can change `markPendingPartitions` to something like this:
   
   ```java
   private void maybeMarkPartitionsPendingRevocation() {
     if (protocol == RebalanceProtocol.EAGER) {
       // When asynchronously committing offsets prior to the revocation of a set of partitions, there will be a
       // window of time between when the offset commit is sent and when it returns and revocation completes. It is
       // possible for pending fetches for these partitions to return during this time, which means the application's
       // position may get ahead of the committed position prior to revocation. This can cause duplicate consumption.
       // To prevent this, we mark the partitions as "pending revocation," which stops the Fetcher from sending new
       // fetches or returning data from previous fetches to the user.
       Set<TopicPartition> partitions = subscriptions.assignedPartitions()
       log.debug("Marking assigned partitions pending for revocation: {}", partitions);
       subscriptions.markPendingRevocation(partitions);
     }
   }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -859,36 +848,49 @@ private Optional<Exception> revokePartitions(SortedSet<TopicPartition> partition
         } else {
             switch (protocol) {
                 case EAGER:
-                    exception = Optional.ofNullable(invokePartitionsRevoked(partitions));
+                    exception = Optional.ofNullable(invokePartitionsRevoked(eagerPartitionsToRevoke));
                     subscriptions.assignFromSubscribed(Collections.emptySet());
-
                     break;
 
                 case COOPERATIVE:
-                    Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
-                    partitions.addAll(ownedPartitions.stream()
-                            .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
-                            .collect(Collectors.toSet()));
-
-                    if (!partitions.isEmpty()) {
-                        exception = Optional.ofNullable(invokePartitionsRevoked(partitions));
-                        ownedPartitions.removeAll(partitions);
-                        subscriptions.assignFromSubscribed(ownedPartitions);
-                    }
+                    exception = revokeUnsubscribedPartitions();
                     break;
             }
         }
 
         return exception;
     }
 
-    private void markPartitionsUnconsumable(final Set<TopicPartition> partitions) {
-        // KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated
-        //  data returned by the consumer poll loop.  Without pausing the partitions, the consumer will move forward
-        //  returning the data w/o committing them.  And the progress will be lost once the partition is revoked.
-        //  This only applies to autocommits, as we expect user to handle the offsets menually during the partition
-        //  revocation.
-        log.debug("Marking assigned partitions unconsumable: {}", partitions);
+    private Optional<Exception> revokeUnsubscribedPartitions() {
+        //For the cooperative strategy, partitions are usually revoked in onJoinComplete when the

Review Comment:
   nit: space after `//`
   
   Can we move this comment into the `COOPERATIVE` case in `revokePartitions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org