You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/07 05:58:27 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #8629: MINOR: Update nodesWithPendingFetchRequests in Fetcher before sending request

guozhangwang opened a new pull request #8629:
URL: https://github.com/apache/kafka/pull/8629


   It is possible that:
   
   1. caller thread triggers `client.send()` is called with the request, and wakes up the client.
   2. heartbeat thread triggers `client.poll()` and gets the response back, not triggering the handler since it is not added yet.
   3. `this.nodesWithPendingFetchRequests.add(entry.getKey().id())` is called.
   
   In this case, `nodesWithPendingFetchRequests` would prevent future requests to send to this node (we would keep seeing `Skipping fetch for partition...`), but there's no response back to remove the entry, as a result the consumer could be blocked without progress.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang edited a comment on pull request #8629: MINOR: Log4j Improvements on Fetcher

Posted by GitBox <gi...@apache.org>.
guozhangwang edited a comment on pull request #8629:
URL: https://github.com/apache/kafka/pull/8629#issuecomment-625376439


   @mjsax @hachikuji My previous investigation on this issue ended up in the broker-side `DefaultRecordBatch#setLastOffset` which could some time set garbage into the byte buffer (sometimes deterministic but incorrect values, sometimes total random values), and after 36+ hours of chasing this lead I came to the conclusion that it is not really a bug in our code but more likely my IDEA's rabbit holes.. for example yesterday night with rebasing I managed to be able to run 500+ without failure and I peeled off each of the changes I made and it still succeeds, while this morning being on top of trunk #8400 it again fails on 170+ runs, and with all of this observations I'd have to say so long with my past two days and that this is likely not related to any issues on broker or consumer side..


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8629: MINOR: Update nodesWithPendingFetchRequests in Fetcher before sending request

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8629:
URL: https://github.com/apache/kafka/pull/8629#discussion_r421258033



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -258,12 +258,12 @@ public synchronized int sendFetches() {
             if (log.isDebugEnabled()) {
                 log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
             }
-            RequestFuture<ClientResponse> future = client.send(fetchTarget, request);
-            // We add the node to the set of nodes with pending fetch requests before adding the
-            // listener because the future may have been fulfilled on another thread (e.g. during a
+            // We add the node to the set of nodes with pending fetch requests before sending the
+            // request to the client because the future may have been fulfilled on another thread (e.g. during a
             // disconnection being handled by the heartbeat thread) which will mean the listener
-            // will be invoked synchronously.
+            // will be invoked synchronously, and hence the added id would not be removed anymore.
             this.nodesWithPendingFetchRequests.add(entry.getKey().id());
+            RequestFuture<ClientResponse> future = client.send(fetchTarget, request);

Review comment:
       This is the actual fix, others are just log4j improvements I got during the debugging.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8629: MINOR: Log4j Improvements on Fetcher

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8629:
URL: https://github.com/apache/kafka/pull/8629#issuecomment-625376439


   @mjsax @hachikuji My previous investigation on this issue ended up in the broker-side `DefaultRecordBatch#setLastOffset` which could some time set garbage into the byte buffer (sometimes deterministic but incorrect values, sometimes total random values), and after 36+ hours of chasing this lead I came to the conclusion that it is not really a but in our code but more likely my IDEA's rabbit holes.. for example yesterday night with rebasing I managed to be able to run 500+ without failure and I peeled off each of the changes I made and it still succeeds, while this morning being on top of trunk #8400 it again fails on 170+ runs, and with all of this observations I'd have to say so long with my past two days and that this is likely not related to any issues on broker or consumer side..


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8629: MINOR: Log4j Improvements on Fetcher

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8629:
URL: https://github.com/apache/kafka/pull/8629#discussion_r421258033



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -258,12 +258,12 @@ public synchronized int sendFetches() {
             if (log.isDebugEnabled()) {
                 log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
             }
-            RequestFuture<ClientResponse> future = client.send(fetchTarget, request);
-            // We add the node to the set of nodes with pending fetch requests before adding the
-            // listener because the future may have been fulfilled on another thread (e.g. during a
+            // We add the node to the set of nodes with pending fetch requests before sending the
+            // request to the client because the future may have been fulfilled on another thread (e.g. during a
             // disconnection being handled by the heartbeat thread) which will mean the listener
-            // will be invoked synchronously.
+            // will be invoked synchronously, and hence the added id would not be removed anymore.
             this.nodesWithPendingFetchRequests.add(entry.getKey().id());
+            RequestFuture<ClientResponse> future = client.send(fetchTarget, request);

Review comment:
       This is the actual fix, others are just log4j improvements I got during the debugging.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8629: MINOR: Log4j Improvements on Fetcher

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8629:
URL: https://github.com/apache/kafka/pull/8629#discussion_r421619393



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -260,7 +260,7 @@ public synchronized int sendFetches() {
             }
             RequestFuture<ClientResponse> future = client.send(fetchTarget, request);
             // We add the node to the set of nodes with pending fetch requests before adding the
-            // listener because the future may have been fulfilled on another thread (e.g. during a
+            // listenerbecause the future may have been fulfilled on another thread (e.g. during a

Review comment:
       I assume this was unintentional.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -676,13 +676,15 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
             if (completedFetch.nextFetchOffset == position.offset) {
                 List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
 
+                log.trace("Returning fetched records {} at offset {} for assigned partition {}",
+                        partRecords, position, completedFetch.partition);

Review comment:
       I feel logging all of the records even at TRACE level will be too much. For example, our system tests often have TRACE enabled. Huge single-line log messages are difficult to consume both visually and in systems like elastic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8629: MINOR: Update nodesWithPendingFetchRequests in Fetcher before sending request

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8629:
URL: https://github.com/apache/kafka/pull/8629#issuecomment-625046006


   @mjsax @hachikuji 
   
   With #8600, I still get various errors (hanging on global restoration, consumer not getting all records in time, etc) every about 170 runs; with this PR on top of #8600 I'm able to run the integration tests 500+ runs without failure.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8629: MINOR: Log4j Improvements on Fetcher

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8629:
URL: https://github.com/apache/kafka/pull/8629#discussion_r421654932



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -676,13 +676,15 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
             if (completedFetch.nextFetchOffset == position.offset) {
                 List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
 
+                log.trace("Returning fetched records {} at offset {} for assigned partition {}",
+                        partRecords, position, completedFetch.partition);

Review comment:
       Makes sense, it is primarily for my local debugging of an integration test which only sends a couple records. I will only print the num.records instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang removed a comment on pull request #8629: MINOR: Log4j Improvements on Fetcher

Posted by GitBox <gi...@apache.org>.
guozhangwang removed a comment on pull request #8629:
URL: https://github.com/apache/kafka/pull/8629#issuecomment-625046006


   @mjsax @hachikuji 
   
   With #8600, I still get various errors (hanging on global restoration, consumer not getting all records in time, etc) every about 170 runs; with this PR on top of #8600 I'm able to run the integration tests 500+ runs without failure.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org