You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/17 05:07:53 UTC

[GitHub] [kafka] vvcephei opened a new pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

vvcephei opened a new pull request #10137:
URL: https://github.com/apache/kafka/pull/10137


   Implements KIP-695
   
   Reverts a previous behavior change to Consumer.poll and replaces
   it with a new Consumer.currentLag API, which returns the client's
   currently cached lag.
   
   Uses this new API to implement the desired task idling semantics
   improvement from KIP-695.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r581156179



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##########
@@ -243,6 +244,11 @@
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
+    /**
+     * @see KafkaConsumer#currentLag(TopicPartition)
+     */
+    OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
       That's a good point, @guozhangwang. Entering the synchronized block will have some overhead each time it's called.
   
   I think we can just reason about the use cases here. My guess is that people would either tend to spot-check specific lags, as we are doing here, or they would tend to periodically check all lags. In the former case, I'd hazard that the current API is fine. In the latter case, we'd face more overhead. I'm sure this is motivated reasoning, but perhaps we can lump the latter case in with @chia7712 's suggestion to expose more metadata and defer it to the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r583151649



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -211,7 +192,7 @@ public boolean readyToProcess(final long wallClockTime) {
             return false;
         } else {
             enforcedProcessingSensor.record(1.0d, wallClockTime);
-            logger.info("Continuing to process although some partition timestamps were not buffered locally." +
+            logger.trace("Continuing to process although some partitions are empty on the broker." +

Review comment:
       I thought if user did not set to `MAX_TASK_IDLE_MS_DISABLED` they may care about when enforce processing starts, so this is better to stay as INFO?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #10137:
URL: https://github.com/apache/kafka/pull/10137


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580416395



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##########
@@ -243,6 +244,11 @@
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
+    /**
+     * @see KafkaConsumer#currentLag(TopicPartition)
+     */
+    OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
       Thanks @ijuma, I considered it, but decided on the current API because:
   1. This is a very quick, local in-memory lookup, so there's no reason to batch multiple requests in one
   2. It complicates the return type. We'd have to return either a `Map<TP, Long>`, with mappings missing for unknown lags (which creates unfortunate null semantics for users), or a `Map<TP, OptionalLong>` which creates a complex-to-understand two hop lookup (`lag:=result.get(tp).get()`). Or else, we could return a more complex domain object object like @chia7712 proposed in the mailing list. All these complicates seem like unnecessary complexity in the case of this particular API, given the first point.
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r577332179



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##########
@@ -243,6 +244,11 @@
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
+    /**
+     * @see KafkaConsumer#currentLag(TopicPartition)
+     */
+    OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
       Pardon me, KIP-695 does not include this change. It seems KIP-695 is still based on `metadata`? Please correct me If I misunderstand anything :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r583848994



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -211,7 +192,7 @@ public boolean readyToProcess(final long wallClockTime) {
             return false;
         } else {
             enforcedProcessingSensor.record(1.0d, wallClockTime);
-            logger.info("Continuing to process although some partition timestamps were not buffered locally." +
+            logger.trace("Continuing to process although some partitions are empty on the broker." +

Review comment:
       Maybe we could leave this detailed logging at TRACE, and just print a single message at `warn` the first time this enforced processing occurs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580741128



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2219,6 +2221,25 @@ public void resume(Collection<TopicPartition> partitions) {
         }
     }
 
+    /**
+     * Get the consumer's current lag on the partition. Returns an "empty" {@link OptionalLong} if the lag is not known,
+     * for example if there is no position yet, or if the end offset is not known yet.
+     *
+     * <p>
+     * This method uses locally cached metadata and never makes a remote call.
+     *
+     * @param topicPartition The partition to get the lag for.
+     *
+     * @return This {@code Consumer} instance's current lag for the given partition.
+     *
+     * @throws IllegalStateException if the {@code topicPartition} is not assigned
+     **/
+    @Override
+    public OptionalLong currentLag(TopicPartition topicPartition) {
+        final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);

Review comment:
       Other methods call `acquireAndEnsureOpen();` first and then call `release()` in the finally block. Should this new method follow same pattern?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r577333186



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##########
@@ -243,6 +244,11 @@
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
+    /**
+     * @see KafkaConsumer#currentLag(TopicPartition)
+     */
+    OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
       Woah, you are _fast_, @chia7712 !
   
   I just sent a message to the vote thread. I wanted to submit this PR first so that the vote thread message can have the full context available.
   
   Do you mind reading over what I said there? If it sounds good to you, then I'll update the KIP, and we can maybe put this whole mess to bed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r583423738



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -156,50 +134,53 @@ public boolean readyToProcess(final long wallClockTime) {
             final TopicPartition partition = entry.getKey();
             final RecordQueue queue = entry.getValue();
 
-            final Long nullableFetchedLag = fetchedLags.get(partition);
 
             if (!queue.isEmpty()) {
                 // this partition is ready for processing
                 idlePartitionDeadlines.remove(partition);
                 queued.add(partition);
-            } else if (nullableFetchedLag == null) {
-                // must wait to fetch metadata for the partition
-                idlePartitionDeadlines.remove(partition);
-                logger.trace("Waiting to fetch data for {}", partition);
-                return false;
-            } else if (nullableFetchedLag > 0L) {
-                // must wait to poll the data we know to be on the broker
-                idlePartitionDeadlines.remove(partition);
-                logger.trace(
-                    "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
-                    partition,
-                    nullableFetchedLag
-                );
-                return false;
             } else {
-                // p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up.
-                // One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs
-                // instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero
-                // lag instead of when we happen to run this method, but realistically it's probably a small difference
-                // and using wall clock time seems more intuitive for users,
-                // since the log message will be as of wallClockTime.
-                idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs);
-                final long deadline = idlePartitionDeadlines.get(partition);
-                if (wallClockTime < deadline) {
+                final OptionalLong fetchedLag = lagProvider.apply(partition);

Review comment:
       It was nullable before (the partition has not been fetched). For this net API, getting lag for such partition can produce exception. Is this a potential bug?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#issuecomment-786747346


   One unrelated test failure:
   ```
   Build / JDK 11 / kafka.api.ConsumerBounceTest.testClose()
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580423836



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##########
@@ -243,6 +244,11 @@
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
+    /**
+     * @see KafkaConsumer#currentLag(TopicPartition)
+     */
+    OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
       If there is no batching benefit, then the simpler API makes sense. @hachikuji Any reason why batching could be useful here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580782275



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##########
@@ -243,6 +244,11 @@
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
+    /**
+     * @see KafkaConsumer#currentLag(TopicPartition)
+     */
+    OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
       For API calls that may incur a broker round trip, have batching of partitions makes sense. For this API I think single partition lookup is good enough.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -156,24 +134,24 @@ public boolean readyToProcess(final long wallClockTime) {
             final TopicPartition partition = entry.getKey();
             final RecordQueue queue = entry.getValue();
 
-            final Long nullableFetchedLag = fetchedLags.get(partition);
+            final OptionalLong fetchedLag = lagProvider.apply(partition);

Review comment:
       Wearing my paranoid hat here: `readyToProcess` is on the critical path, called per record, while we would only update the underlying lag at most as frequent as the consumer poll rate. And in practice we would fall in to the first condition `!queue.isEmpty()` most of the time. On the other hand, `partitionLag` call on `SubscriptionState` is synchronized and could slow down the fetching thread (well, maybe just a bit). So could we call the provider only necessary, i.e. the queue is empty and the lag is either == 0 or not present?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r583754418



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -211,7 +192,7 @@ public boolean readyToProcess(final long wallClockTime) {
             return false;
         } else {
             enforcedProcessingSensor.record(1.0d, wallClockTime);
-            logger.info("Continuing to process although some partition timestamps were not buffered locally." +
+            logger.trace("Continuing to process although some partitions are empty on the broker." +

Review comment:
       Thanks @guozhangwang, I thought so as well, but @ableegoldman pointed out to me that it actually prints out on every invocation while we are enforcing processing, which turns out to flood the logs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580416395



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##########
@@ -243,6 +244,11 @@
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
+    /**
+     * @see KafkaConsumer#currentLag(TopicPartition)
+     */
+    OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
       Thanks @ijuma, I considered it, but decided on the current API because:
   1. This is a very quick, local in-memory lookup, so there's no reason to batch multiple requests in one
   2. It complicates the return type. We'd have to return either a `Map<TP, Long>`, with mappings missing for unknown lags (which creates unfortunate null semantics for users), or a `Map<TP, OptionalLong>` which creates a complex-to-understand two hop lookup (`lag:=result.get(tp).get()`). Or else, we could return a more complex domain object object like @chia7712 proposed in the mailing list. All these complications seem like unnecessary complexity in the case of this particular API, given the first point.
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r581160931



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2219,6 +2221,25 @@ public void resume(Collection<TopicPartition> partitions) {
         }
     }
 
+    /**
+     * Get the consumer's current lag on the partition. Returns an "empty" {@link OptionalLong} if the lag is not known,
+     * for example if there is no position yet, or if the end offset is not known yet.
+     *
+     * <p>
+     * This method uses locally cached metadata and never makes a remote call.
+     *
+     * @param topicPartition The partition to get the lag for.
+     *
+     * @return This {@code Consumer} instance's current lag for the given partition.
+     *
+     * @throws IllegalStateException if the {@code topicPartition} is not assigned
+     **/
+    @Override
+    public OptionalLong currentLag(TopicPartition topicPartition) {
+        final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);

Review comment:
       Thanks, I overlooked that, and it's a good idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r583755633



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -156,50 +134,53 @@ public boolean readyToProcess(final long wallClockTime) {
             final TopicPartition partition = entry.getKey();
             final RecordQueue queue = entry.getValue();
 
-            final Long nullableFetchedLag = fetchedLags.get(partition);
 
             if (!queue.isEmpty()) {
                 // this partition is ready for processing
                 idlePartitionDeadlines.remove(partition);
                 queued.add(partition);
-            } else if (nullableFetchedLag == null) {
-                // must wait to fetch metadata for the partition
-                idlePartitionDeadlines.remove(partition);
-                logger.trace("Waiting to fetch data for {}", partition);
-                return false;
-            } else if (nullableFetchedLag > 0L) {
-                // must wait to poll the data we know to be on the broker
-                idlePartitionDeadlines.remove(partition);
-                logger.trace(
-                    "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
-                    partition,
-                    nullableFetchedLag
-                );
-                return false;
             } else {
-                // p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up.
-                // One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs
-                // instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero
-                // lag instead of when we happen to run this method, but realistically it's probably a small difference
-                // and using wall clock time seems more intuitive for users,
-                // since the log message will be as of wallClockTime.
-                idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs);
-                final long deadline = idlePartitionDeadlines.get(partition);
-                if (wallClockTime < deadline) {
+                final OptionalLong fetchedLag = lagProvider.apply(partition);

Review comment:
       Thanks, @chia7712 . Yes, it can produce an exception now, but only if the partition is not _assigned_. If it simply has not been fetched, then the OptionalLong will be "empty".
   
   I think this exception is fine to raise, since the assignment being unassigned at this point seems like an illegal state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r581160287



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -156,24 +134,24 @@ public boolean readyToProcess(final long wallClockTime) {
             final TopicPartition partition = entry.getKey();
             final RecordQueue queue = entry.getValue();
 
-            final Long nullableFetchedLag = fetchedLags.get(partition);
+            final OptionalLong fetchedLag = lagProvider.apply(partition);

Review comment:
       Thanks, this is a good suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580786789



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##########
@@ -243,6 +244,11 @@
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
+    /**
+     * @see KafkaConsumer#currentLag(TopicPartition)
+     */
+    OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
       If we concern that users may call this function too frequent looping a large number of partitions, and each call is synchronizing on the subscription state, then maybe we can make it in a batching mode.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580351600



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##########
@@ -243,6 +244,11 @@
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
+    /**
+     * @see KafkaConsumer#currentLag(TopicPartition)
+     */
+    OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
       Quick question, should the API take a `Collection<TopicPartition>` like other APIs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org