You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/22 21:00:35 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

guozhangwang commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675158219



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition topicPartition) {
         acquireAndEnsureOpen();
         try {
             final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
-            return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+            // if the log end offset is not known and hence cannot return lag,
+            // issue a list offset request for that partition so that next time
+            // we may get the answer; we do not need to wait for the return value
+            // since we would not try to poll the network client synchronously
+            if (lag == null) {
+                if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) {
+                    log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
+                    fetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));

Review comment:
       I modified the fetcher so that it would not wait for the future to complete, with timer(0) it would not be a blocking call. Or did I miss anything?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org