You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/15 05:52:30 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

guozhangwang opened a new pull request #11057:
URL: https://github.com/apache/kafka/pull/11057


   This is an idea I had for attacking this on the consumer client level:
   
   1. When listOffset result is retrieved inside Fetcher, check if the partitions are part of the subscriptions of the consumer; if yes update the corresponding LSO or HW based on the isolation level.
   2. When partitionLag cannot return result since the log end offset (LSO/HW) is not known, send an async list offset which would be completed by other calls polling (also the hb thread may complete it as well), and hope the next partitionLag would get the result.
   
   Then on the streams side, the first partitionLag would still return empty, but soon enough the subsequent partitionLag should return data and we would not wait for the fetch response to update fetched state.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-880416855


   @vvcephei @ableegoldman @showuon LMK what do you think about this approach. I have not do the due diligence of test coverage yet, will do that if people are +1 on this direction. If we feel this is a general issue for normal consumers as well, then this approach would be better than just fixing it at the Streams layer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-885943296


   @ableegoldman I updated the PR to move to the "subscription" layer, so that effectively this would only apply to partitions that are in the subscription, and only for end-offsets in list offset requests. LMK WDYT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675163448



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition topicPartition) {
         acquireAndEnsureOpen();
         try {
             final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
-            return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+            // if the log end offset is not known and hence cannot return lag,
+            // issue a list offset request for that partition so that next time
+            // we may get the answer; we do not need to wait for the return value
+            // since we would not try to poll the network client synchronously
+            if (lag == null) {
+                if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) {
+                    log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
+                    fetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));

Review comment:
       Ah sorry, I overlooked that we passed in a timeout of 0 (originally thought that would throw a TimeoutException but I see now it would just return -- nevermind this then)
   
   However I do think it's probably worth taking care not to fire off a million requests per second (possible slight over-exaggeration) when we're just waiting on the same partition(s). It shouldn't be too complicated to avoid sending duplicated requests so imo it's not over-optimization...thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that it might be more eager than before, to fetch the offset. But, looks like we need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675269697



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -907,14 +944,23 @@ private void maybeSetOffsetForLeaderException(RuntimeException e) {
         final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
 
         for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
-            RequestFuture<ListOffsetResult> future =
-                sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
+            // we skip sending the list off request only if there's already one with the exact
+            // requested offsets for the destination node

Review comment:
       Hm..I wonder if deduplicating like this within the Fetcher itself is too low-level, ie there may be other callers of `sendListOffsetsRequests` that actually do want to issue a new request. I think there are arguments to be made for doing this for all requests, but maybe also some arguments against it -- this is a more drastic change that means APIs like `Consumer#endOffsets` can actually return old/stale results (by up to the configured `request.timeout` at most).
   
   Since this is a last-minute blocker fix I'd prefer to keep the changes to a minimum and scoped to the specific bug, if at all possible. Can we do the deduplication in another layer, so that we only avoid re-sending the listOffsets request in the specific case of `currentLag`, where we know it's acceptable to report a slightly-out-of-date value because the alternative is to report no value at all? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-881783736


   Let me test it, and let you know. BTW, I like this simple client side fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675136826



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition topicPartition) {
         acquireAndEnsureOpen();
         try {
             final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
-            return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+            // if the log end offset is not known and hence cannot return lag,
+            // issue a list offset request for that partition so that next time
+            // we may get the answer; we do not need to wait for the return value
+            // since we would not try to poll the network client synchronously
+            if (lag == null) {
+                if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) {
+                    log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
+                    fetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));

Review comment:
       Isn't this actually a blocking call? I couldn't find anything that asserted yes or no in the javadocs, but this ultimately calls down into `Fetcher#fetchOffsetsByTimes` which does seem to wait for the request future to complete (in fact, it seems to be doing a busy wait on the timer...? that doesn't seem right 🤔 )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that it might be more eager than before, to fetch the offset. But, looks like we need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675136826



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition topicPartition) {
         acquireAndEnsureOpen();
         try {
             final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
-            return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+            // if the log end offset is not known and hence cannot return lag,
+            // issue a list offset request for that partition so that next time
+            // we may get the answer; we do not need to wait for the return value
+            // since we would not try to poll the network client synchronously
+            if (lag == null) {
+                if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) {
+                    log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
+                    fetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));

Review comment:
       Isn't this a blocking call? I couldn't find anything that asserted yes or no in the javadocs, but this ultimately calls down into `Fetcher#fetchOffsetsByTimes` which does seem to wait for the request future to complete (in fact, it seems to be doing a busy wait on the timer...? that doesn't seem right 🤔 )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-885964615


   Cherry-picked to 3.0 cc @kkonstantine 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that it might be a little more eager than before, to fetch the offset. But, looks like we need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-880417140


   Also cc @cmccabe @hachikuji .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that it might be a little more eager than before, to fetch the offset. But, looks like we need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that it might be a little more eager than before, to fetch the offset. But, looks like we need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-881696896


   Thanks @vvcephei , at the mean time I will try to just run https://issues.apache.org/jira/browse/KAFKA-9295 and see if it indeed help reducing the flakiness. Also cc @showuon who discovered the issue from that test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-885328621


   I added inside the fetcher a concurrent hashmap maintaining the in-flight list-offset futures, and based on the map skip sending the same requests. LMK if it looks good, and then I will merge @vvcephei @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675158219



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition topicPartition) {
         acquireAndEnsureOpen();
         try {
             final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
-            return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+            // if the log end offset is not known and hence cannot return lag,
+            // issue a list offset request for that partition so that next time
+            // we may get the answer; we do not need to wait for the return value
+            // since we would not try to poll the network client synchronously
+            if (lag == null) {
+                if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) {
+                    log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
+                    fetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));

Review comment:
       I modified the fetcher so that it would not wait for the future to complete, with timer(0) it would not be a blocking call. Or did I miss anything?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #11057:
URL: https://github.com/apache/kafka/pull/11057


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-881784596


   Also, @guozhangwang , if you just run this test in KAFKA-9295, it's really difficult to reproduce it. It needs some tweak for it. Anyway, I will test it and let you know. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that it might be more eager than before, to fetch the offset. But, looks like we need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org