You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/04 07:06:22 UTC

[GitHub] [kafka] showuon commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

showuon commented on a change in pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#discussion_r742575556



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2007,7 +2007,7 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time
      * Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return
      * any records from these partitions until they have been resumed using {@link #resume(Collection)}.
      * Note that this method does not affect partition subscription. In particular, it does not cause a group
-     * rebalance when automatic assignment is used.
+     * rebalance when automatic assignment is used. And groupRebalance does not preserve pause state.

Review comment:
       How about this:
   ```
   * ...
   * rebalance when automatic assignment is used.
   * 
   * Note: Rebalance will not preserve the pause/resume state.
   ```
   
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -307,6 +307,10 @@ private Exception invokePartitionsAssigned(final Set<TopicPartition> assignedPar
 
     private Exception invokePartitionsRevoked(final Set<TopicPartition> revokedPartitions) {
         log.info("Revoke previously assigned partitions {}", Utils.join(revokedPartitions, ", "));
+        Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions();
+        revokePausedPartitions.retainAll(revokedPartitions);
+        if (!revokePausedPartitions.isEmpty())
+            log.info("Revoke previously paused partitions {}", Utils.join(revokePausedPartitions, ", "));

Review comment:
       I'm afraid that users might get confused, because it looks like we revoke twice:
   ```
   Revoke previously assigned partitions tp1, tp2.
   Revoke previously paused partitions tp1
   ```
   
   How about it:
   `log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.join(revokePausedPartitions, ", "));`
   WDYT?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org