You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/14 18:59:20 UTC

[kafka] branch 3.4 updated: KAFKA-14054: Handle TimeoutException gracefully (#13534)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new eb616d3ffc9 KAFKA-14054: Handle TimeoutException gracefully (#13534)
eb616d3ffc9 is described below

commit eb616d3ffc94ddb1f92cbb7e29cb591bcc7caceb
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Apr 14 09:43:53 2023 -0700

    KAFKA-14054: Handle TimeoutException gracefully (#13534)
    
    We incorrectly assumed, that `consumer.position()` should always be
    served by the consumer locally set position.
    
    However, within `commitNeeded()` we check if first `if(commitNeeded)`
    and thus go into the else only if we have not processed data (otherwise,
    `commitNeeded` would be true). For this reason, we actually don't know
    if the consumer has a valid position or not.
    
    We should just swallow a timeout if the consumer cannot get the position
    from the broker, and try the next partition. If any position advances, we
    can return true, and if we timeout for all partitions we can return
    false.
    
    Reviewers: Michal Cabak (@miccab), John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../apache/kafka/streams/processor/internals/StreamTask.java | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f3b5818ddfb..2708e10b6c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1177,13 +1177,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                         commitNeeded = true;
                         entry.setValue(offset - 1);
                     }
-                } catch (final TimeoutException error) {
-                    // the `consumer.position()` call should never block, because we know that we did process data
-                    // for the requested partition and thus the consumer should have a valid local position
-                    // that it can return immediately
-
-                    // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
-                    throw new IllegalStateException(error);
+                } catch (final TimeoutException swallow) {
+                    log.debug(
+                        String.format("Could not get consumer position for partition %s", partition),
+                        swallow
+                    );
                 } catch (final KafkaException fatal) {
                     throw new StreamsException(fatal);
                 }