You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/07 03:43:19 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

guozhangwang commented on a change in pull request #8934:
URL: https://github.com/apache/kafka/pull/8934#discussion_r450594795



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1218,17 +1218,20 @@ public void assign(Collection<TopicPartition> partitions) {
                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
             }
 
-            // poll for new data until the timeout expires
             do {
                 client.maybeTriggerWakeup();
 
                 if (includeMetadataInTimeout) {
-                    // try to update assignment metadata BUT do not need to block on the timer,
-                    // since even if we are 1) in the middle of a rebalance or 2) have partitions
-                    // with unknown starting positions we may still want to return some data
-                    // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms
-                    // to never block on completing the rebalance procedure if there's any
-                    updateAssignmentMetadataIfNeeded(time.timer(0L));
+                    // try to update assignment metadata BUT do not need to block on the timer if we still have
+                    // some assigned partitions, since even if we are 1) in the middle of a rebalance
+                    // or 2) have partitions with unknown starting positions we may still want to return some data
+                    // as long as there are some partitions fetchable; NOTE we do not block on rebalancing to complete
+                    // if there's one pending if we still have some fetchable partitions
+                    if (subscriptions.fetchablePartitions(tp -> true).isEmpty()) {

Review comment:
       Agree, I can think of caching a boolean to avoid it.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1218,17 +1218,20 @@ public void assign(Collection<TopicPartition> partitions) {
                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
             }
 
-            // poll for new data until the timeout expires
             do {
                 client.maybeTriggerWakeup();
 
                 if (includeMetadataInTimeout) {
-                    // try to update assignment metadata BUT do not need to block on the timer,
-                    // since even if we are 1) in the middle of a rebalance or 2) have partitions
-                    // with unknown starting positions we may still want to return some data
-                    // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms
-                    // to never block on completing the rebalance procedure if there's any
-                    updateAssignmentMetadataIfNeeded(time.timer(0L));
+                    // try to update assignment metadata BUT do not need to block on the timer if we still have

Review comment:
       My guess is that the `pollDelayMs` from `trySend(timer.currentTimeMs())` is zero and hence
   
   ```
   long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);
   ```
   
   would become zero.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org