You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/09 00:07:00 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966512403


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+            partitions.addAll(ownedPartitions.stream()
+                    .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
+                    .collect(Collectors.toSet()));
+            return partitions;
+        }
+
+        log.debug("Invalid protocol: {}. No partition will be revoked.", protocol);
+        return partitions;
+    }
+
+    private void pausePartitions(Set<TopicPartition> partitions) {
+        // KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated
+        //  data returned by the consumer poll loop.  Without pausing the partitions, the consumer will move forward
+        //  returning the data w/o committing them.  And the progress will be lost once the partition is revoked.
+        //  This only applies to autocommits, as we expect user to handle the offsets menually during the partition
+        //  revocation.
+
+        log.debug("Pausing partitions {} before onJoinPrepare", partitions);
+        partitions.forEach(tp -> subscriptionState().pause(tp));

Review Comment:
   I have mixed feelings about reusing the pause mechanism here. On the one hand, it does what we want. On the other hand, the pause state can be mutated by the user. What if the user calls `resume()` on a partition that we paused internally? Sounds crazy perhaps, but I think I'd rather have a mechanism that can only be accessed internally for stuff like this.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);
+
         // async commit offsets prior to rebalance if auto-commit enabled
         // and there is no in-flight offset commit request
-        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        if (autoCommitEnabled) {
+            pausePartitions(partitionsToRevoke);

Review Comment:
   It seems like this bug (and some of the complexity in this patch) is due to the fact that we do the auto-commit prior to revoking partitions. I wonder if that is really necessary. If we revoke first, then the partitions would be removed from `SubscriptionState` and we wouldn't have to worry about fetches for these partitions returning. Could that work as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
             joinPrepareTimer.update();
         }
 
+        final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   Is your concern that the subscription could change in between the time that we  pause the partitions and the time that the revocation callback is triggered?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org