You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/06 22:20:52 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

hachikuji commented on a change in pull request #8934:
URL: https://github.com/apache/kafka/pull/8934#discussion_r450504028



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -416,7 +428,13 @@ boolean joinGroupIfNeeded(final Timer timer) {
             }
 
             final RequestFuture<ByteBuffer> future = initiateJoinGroup();
-            client.poll(future, timer);
+
+            // if the flat is not set to true; we only try once and not block on the join result

Review comment:
       flag?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1258,11 +1261,19 @@ public void assign(Collection<TopicPartition> partitions) {
         }
     }
 
+    private boolean coordinatorNeededForAssignment() {

Review comment:
       nit: seems unused?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1218,17 +1218,20 @@ public void assign(Collection<TopicPartition> partitions) {
                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
             }
 
-            // poll for new data until the timeout expires
             do {
                 client.maybeTriggerWakeup();
 
                 if (includeMetadataInTimeout) {
-                    // try to update assignment metadata BUT do not need to block on the timer,
-                    // since even if we are 1) in the middle of a rebalance or 2) have partitions
-                    // with unknown starting positions we may still want to return some data
-                    // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms
-                    // to never block on completing the rebalance procedure if there's any
-                    updateAssignmentMetadataIfNeeded(time.timer(0L));
+                    // try to update assignment metadata BUT do not need to block on the timer if we still have
+                    // some assigned partitions, since even if we are 1) in the middle of a rebalance
+                    // or 2) have partitions with unknown starting positions we may still want to return some data
+                    // as long as there are some partitions fetchable; NOTE we do not block on rebalancing to complete
+                    // if there's one pending if we still have some fetchable partitions
+                    if (subscriptions.fetchablePartitions(tp -> true).isEmpty()) {

Review comment:
       This call to `fetchablePartitions` is unfortunate. It is an entire pass over all the assigned partitions on every poll. Is there any way it can be avoided?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -416,7 +428,13 @@ boolean joinGroupIfNeeded(final Timer timer) {
             }
 
             final RequestFuture<ByteBuffer> future = initiateJoinGroup();
-            client.poll(future, timer);
+
+            // if the flat is not set to true; we only try once and not block on the join result
+            if (waitUntilComplete)
+                client.poll(future, timer);
+            else
+                client.poll(timer);

Review comment:
       Do we want this to be `poll(0)`? Otherwise we're still blocking here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1218,17 +1218,20 @@ public void assign(Collection<TopicPartition> partitions) {
                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
             }
 
-            // poll for new data until the timeout expires
             do {
                 client.maybeTriggerWakeup();
 
                 if (includeMetadataInTimeout) {
-                    // try to update assignment metadata BUT do not need to block on the timer,
-                    // since even if we are 1) in the middle of a rebalance or 2) have partitions
-                    // with unknown starting positions we may still want to return some data
-                    // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms
-                    // to never block on completing the rebalance procedure if there's any
-                    updateAssignmentMetadataIfNeeded(time.timer(0L));
+                    // try to update assignment metadata BUT do not need to block on the timer if we still have

Review comment:
       Just making sure I understand the problem. In the old logic, `updateAssignmentMetadataIfNeeded` never blocks. Even if we have no assignment and do not know the coordinator, we won't block here. That means we should fall through to `pollForFetches`. The poll timeout is set by the following logic:
   ```java
           long pollTimeout = coordinator == null ? timer.remainingMs() :
                   Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
   ```
   I think the only way I can see this logic getting into trouble is if `Heartbeat.timeToNextHeartbeat` returns a small value. So if the coordinator remains unknown for a little while and we need to heartbeat, we might get into a busy loop. Is that about right or are there other cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org