You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/04 19:14:54 UTC
[kafka] branch trunk updated: KAFKA 8311: better handle timeout
exception on Stream thread (#6662)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 055c9c7 KAFKA 8311: better handle timeout exception on Stream thread (#6662)
055c9c7 is described below
commit 055c9c7bd6780436d201f5101df464814ba940e2
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Tue Jun 4 12:14:35 2019 -0700
KAFKA 8311: better handle timeout exception on Stream thread (#6662)
The goals for this small diff are:
1. Give user guidance if they want to relax commit timeout threshold
2. Indicate the code path where timeout exception was caught
Reviewers: John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 01b8989..5be065d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1770,7 +1770,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
Collections.singleton(partition), time.timer(timeout));
if (offsets == null) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
- "committed offset for partition " + partition + " could be determined");
+ "committed offset for partition " + partition + " could be determined. Try tuning default.api.timeout.ms " +
+ "larger to relax the threshold.");
} else {
offsets.forEach(this::updateLastSeenEpochIfNewer);
return offsets.get(partition);