You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "RivenSun2 (via GitHub)" <gi...@apache.org> on 2023/02/17 09:11:13 UTC

[GitHub] [kafka] RivenSun2 opened a new pull request, #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

RivenSun2 opened a new pull request, #13270:
URL: https://github.com/apache/kafka/pull/13270

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1441096550

   Hi @guozhangwang 
   could you give any suggestions?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1442748397

   > Hi @showuon Thank you for your reply. Happy to submit a short-term solution. I will submit new code changes later. Thanks.
   
   Great! Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13270:
URL: https://github.com/apache/kafka/pull/13270#discussion_r1117278567


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -355,6 +355,12 @@ protected synchronized long timeToNextHeartbeat(long now) {
         // we don't need to send heartbeats
         if (state.hasNotJoinedGroup())
             return Long.MAX_VALUE;
+        if (heartbeatThread != null) {

Review Comment:
   I'm also curious, do we actually need to perform null check for heartbeatThread? We will always create one when we try or already be in a group no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on a diff in pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on code in PR #13270:
URL: https://github.com/apache/kafka/pull/13270#discussion_r1119593189


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -355,6 +355,12 @@ protected synchronized long timeToNextHeartbeat(long now) {
         // we don't need to send heartbeats
         if (state.hasNotJoinedGroup())
             return Long.MAX_VALUE;
+        if (heartbeatThread != null) {

Review Comment:
   update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1442743247

   Hi @showuon Thank you for your reply.
   Happy to submit a short-term solution. I will submit new code changes later.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1442658265

   Hi @showuon  @guozhangwang  @philipnee 
   Thank you for your suggestions.
   If you guys fix this issue in the future, please help to update the jira status.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on a diff in pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on code in PR #13270:
URL: https://github.com/apache/kafka/pull/13270#discussion_r1117934696


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -355,6 +355,12 @@ protected synchronized long timeToNextHeartbeat(long now) {
         // we don't need to send heartbeats
         if (state.hasNotJoinedGroup())
             return Long.MAX_VALUE;
+        if (heartbeatThread != null) {

Review Comment:
   @philipnee  For a KafkaConsumer with group.id set and **assign** mode used. The coordinator will be created, but the heartbeat thread will not be created. So the non-null judgment here makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1447472363

   Hi @showuon  could you help review this PR?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13270:
URL: https://github.com/apache/kafka/pull/13270#discussion_r1117263913


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -355,6 +355,12 @@ protected synchronized long timeToNextHeartbeat(long now) {
         // we don't need to send heartbeats
         if (state.hasNotJoinedGroup())
             return Long.MAX_VALUE;
+        if (heartbeatThread != null) {

Review Comment:
   We could 1 line it ya?
   ```
   if (heartbeatThread != null && heartbeatThread.hasFailed()) {
   ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13270:
URL: https://github.com/apache/kafka/pull/13270#discussion_r1117261344


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##########
@@ -1206,10 +1206,21 @@ public void testUncaughtExceptionInHeartbeatThread() throws Exception {
                 throw e;
             return false;
         }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
+        coordinator.ensureActiveGroup();
+        mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+
+        try {

Review Comment:
   Do you think It could be cleaner to use assertThrows? Something like. In this case we won't need to fail then assertEquals. (And the test message can be more accurate)
   ```
   assertThrows(SadException.class,
           () -> coordinator.sadPath());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1442291215

   Hi @RivenSun2 after some thought I think I also agree with @philipnee on throwing an exception directly form the foreground thread's callers. We can defer more complicated improvements such as adding new states or trying to categorize different reasons that kills the heartbeat thread into future works, as @kirktrue is revamping the consumer now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1442714319

   Sorry, maybe I didn't make it clear.
   I also suggest to track this issue with KAFKA-14729, just hope you guys assign this jira to relevant experts. And after this problem is better solved in the future, will update the status of KAFKA-14729 to be resolved.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1449707662

   Failed testcases should not be related to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon merged PR #13270:
URL: https://github.com/apache/kafka/pull/13270


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1434906588

   This is an interesting find, thanks @RivenSun2 . In general I think if the background thread dies for whatever the reason we should consider the following actions in precedence:
   
   1) Make sure the consumer now falls into an abnormal state which would not return data, and would not try to tie up the caller thread. This is also for notifying the user.
   
   2) Try to "selfheal" by re-creating the thread (we do not need to do it in this PR, just laying out the ground here), in order to bring the consumer back to normal state.
   
   3) If we cannot selfheal the consumer and it simply becomes useless, let the consumer to throw an exception for any API calls so that the caller thread would then go ahead and recreate a brand new consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1434909837

   As for this PR, I'm actually thinking if it makes sense to introduce a new state inside `MemberState`, e.g. "ABNORMAL" to indicate that the consumer is not usable at the moment --- even if it can still successfully fetch records since it does not rely on the heartbeat thread --- and under that state we should not proceed by returning any more data, in order to notify the caller of this consumer.
   
   cc @kirktrue @philipnee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435433330

   Hi @guozhangwang thank you for your reply.
   Can we introduce a state like "ABNORMAL" in a new PR, this state may only appear on consumers with heartbeat threads (group.id!=null).
   Introducing this state also needs to consider the value brought by this state and the impact on existing code logic. I think it's such a big change that it even needs a KIP?
   
   In general, this PR should be a small change to solve the problem that the current pollForFetches method takes up cpu.
   What do you think? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1442822672

   update code and test case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435480289

   @showuon Thanks for your suggestions!
   To be honest, I also considered throwing an exception directly in the `timeToNextHeartbeat` method before, because this is also done in the `AbstractCoordinator#pollHeartbeat` method.
   Throwing an exception here allows most users to jump out of their loop code (including the recommended usage on the `KafkaConsumer` class annotation).
   However, if the user uses a method similar to the following, it will still fall into an infinite loop, **even if they have already perceived that the poll method has thrown an exception.**
   ```
   while (true) {
              try {
                  consumer.poll(duration);
              } catch (Exception e) {
                  log.error("has error when consumer poll!", e);
              }
           }
   ```
   The current modification of this PR is compatible with this case.
   If we don't consider this case, I think throwing an exception here is one of the easiest ways in the short term.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1442659692

   By the way, you can assign that jira to relevant experts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1442706893

   Hey, @RivenSun2, thanks again for the PR and response - I think we still need to address this with the current client.  We are currently trying to refactor the consumer code, but that (the new client) will come later, so I think this jira is appropriate for this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on a diff in pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on code in PR #13270:
URL: https://github.com/apache/kafka/pull/13270#discussion_r1117936096


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##########
@@ -1206,10 +1206,21 @@ public void testUncaughtExceptionInHeartbeatThread() throws Exception {
                 throw e;
             return false;
         }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
+        coordinator.ensureActiveGroup();
+        mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+
+        try {

Review Comment:
   I agree with you. But here, if the method coordinator.timeToNextHeartbeat(0) or coordinator.pollHeartbeat(mockTime.milliseconds()) is called **only once**, an exception may not be thrown.
   We expect that within one second, the background heartbeat thread can run and eventually exit abnormally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435476445

   Hey! Just to chime in here: I like the idea of throwing an exception there and it seems fairly straightforward.
   
   To restart the heartbeat thread, is it sufficient to do that on the top of the consumer.poll()? I'm basically wondering if we could piggyback the restart onto the next user poll?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "RivenSun2 (via GitHub)" <gi...@apache.org>.
RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1434359335

   Hi @guozhangwang @showuon 
   please help to review PR when available.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1435470639

   >  In general I think if the background thread dies for whatever the reason we should consider the following actions in precedence:
       1. Make sure the consumer now falls into an abnormal state which would not return data, and would not try to tie up the caller thread. This is also for notifying the user.
       2. Try to "selfheal" by re-creating the thread (we do not need to do it in this PR, just laying out the ground here), in order to bring the consumer back to normal state.
       3. If we cannot selfheal the consumer and it simply becomes useless, let the consumer to throw an exception for any API calls so that the caller thread would then go ahead and recreate a brand new consumer.
   
   Good suggestion. I was thinking we can directly throw exception in `timeToNextHeartbeat` method when heartbeatThread is failed. That should be good enough. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1442732996

   @RivenSun2 , I think what we're saying here, is we should throw exception in main consumer thread for a short-term solution. That is, throwing exception in `timeToNextHeartbeat` if heartbeat thread is failed. For your concern:
   ```
   try {
                  consumer.poll(duration);
              } catch (Exception e) {
                  log.error("has error when consumer poll!", e);
              }
   ```
   In this case, it'll be the consumer client's issue, not ours. The consumer should handle each kind of exception differently, not just catch all exceptions and keep retrying. 
   
   So, back to the original question, are you able to update the PR to throw exception in `timeToNextHeartbeat` if heartbeat thread is failed? If you don't have time, we can assign to other contributors. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13270:
URL: https://github.com/apache/kafka/pull/13270#discussion_r1119559892


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -355,6 +355,12 @@ protected synchronized long timeToNextHeartbeat(long now) {
         // we don't need to send heartbeats
         if (state.hasNotJoinedGroup())
             return Long.MAX_VALUE;
+        if (heartbeatThread != null) {

Review Comment:
   > We could 1 line it ya?
   if (heartbeatThread != null && heartbeatThread.hasFailed()) {
   ...
   }
   
   
   Could you address this comment?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org