You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/14 16:44:02 UTC

[kafka] branch trunk updated: KAFKA-14054: Handle TimeoutException gracefully (#13534)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 20028e24cca KAFKA-14054: Handle TimeoutException gracefully (#13534)
20028e24cca is described below

commit 20028e24cca91422b8f02fdbf45d2cd9ef24c901
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Apr 14 09:43:53 2023 -0700

    KAFKA-14054: Handle TimeoutException gracefully (#13534)
    
    We incorrectly assumed, that `consumer.position()` should always be
    served by the consumer locally set position.
    
    However, within `commitNeeded()` we check if first `if(commitNeeded)`
    and thus go into the else only if we have not processed data (otherwise,
    `commitNeeded` would be true). For this reason, we actually don't know
    if the consumer has a valid position or not.
    
    We should just swallow a timeout if the consumer cannot get the position
    from the broker, and try the next partition. If any position advances, we
    can return true, and if we timeout for all partitions we can return
    false.
    
    Reviewers: Michal Cabak (@miccab), John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../apache/kafka/streams/processor/internals/StreamTask.java | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 86fd3391f1d..22edf797b54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1195,13 +1195,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                         commitNeeded = true;
                         entry.setValue(offset - 1);
                     }
-                } catch (final TimeoutException error) {
-                    // the `consumer.position()` call should never block, because we know that we did process data
-                    // for the requested partition and thus the consumer should have a valid local position
-                    // that it can return immediately
-
-                    // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
-                    throw new IllegalStateException(error);
+                } catch (final TimeoutException swallow) {
+                    log.debug(
+                        String.format("Could not get consumer position for partition %s", partition),
+                        swallow
+                    );
                 } catch (final KafkaException fatal) {
                     throw new StreamsException(fatal);
                 }