You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2020/12/11 16:22:04 UTC

Re: KIP to Gracefully handle timeout exception on kafka streams

Matthias,

By the way, one more of our service recently encountered this exception:
can you suggest if this can also be avoided by tuning any specific
configuration ?

{"@timestamp":"2020-11-24T13:33:38.617+00:00","@version":"1","message":"Exception
processing processor thread -
analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3
stream - org.apache.kafka.common.errors.TimeoutException: Timeout of
60000ms expired before the position for partition engagement-18 could be
determined","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalStateException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired
before the position for partition engagement-18 could be determined\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:510)\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)\n\tat
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
expired before the position for partition engagement-18 could be
determined\n"}



On Sun, Nov 22, 2020 at 12:35 AM Matthias J. Sax <mj...@apache.org> wrote:

> KIP-572 will only ship in 2.8.0.
>
> For the exception you hit, it's `max.block.ms` -- you might also look
> into `default.api.timeout.ms`.
>
> In general, the relevant configs are documented in the JavaDocs of the
> corresponding client method.
>
>
> -Matthias
>
> On 11/20/20 9:11 PM, Pushkar Deole wrote:
> > Thanks Matthias... We are already on kafka 2.5.0, and
> > https://issues.apache.org/jira/browse/KAFKA-8803  mentions that these
> type
> > of issues are fixed in 2.5.0
> >
> > Is KIP-572 planned for 2.7.0 ?
> >
> > Also, for timeout and retries, can you provide which parameters should we
> > configure to higher values for now?
> >
> >
> > On Sat, Nov 21, 2020 at 5:15 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> Yes, if brokers are upgraded via rolling bounce, and the embedded
> >> clients are configured with large enough timeouts and retries, they
> >> should just fail over to running brokers if a single broker is bounced.
> >>
> >> If you get a timeout exception, than KafkaStreams dies atm -- we have
> >> KIP-572 in-flight that will improve the situation by adding one more
> >> retry layer within KafkaStreams itself. For now, you would need to
> >> increase the corresponding client timeouts to avoid that the client
> >> throws a timeout exception.
> >>
> >> There is however https://issues.apache.org/jira/browse/KAFKA-8803 that
> >> you could have hit, too.
> >>
> >>
> >> -Matthias
> >>
> >> On 11/18/20 7:05 AM, Pushkar Deole wrote:
> >>> Matthias,
> >>>
> >>> We recently ran into an issue where kafka brokers upgraded (i guess it
> >> was
> >>> rolling update) for Aiven business plan 4 to plan 8. This involves
> change
> >>> to cpu, memory and storage for each broker.
> >>>
> >>> Since this should be rolling upgrade, we expected services to survive,
> >>> however in one service we saw streams ran into Error with below
> >> exception:
> >>> Few questions:
> >>> 1. If a broker goes down, the kafka streams client should handle
> >> internally
> >>> and connect to available broker since we have topic with replicas equal
> >> to
> >>> no. of brokers. Is this correct?
> >>> 2. the below error says timeout expired while awaiting InitProducerId..
> >>> what does this signify and why would this timeout occur when there will
> >> be
> >>> other brokers up and running?
> >>>
> >>>
> >>>
> >>
> {"@timestamp":"2020-11-16T13:42:31.110+00:00","@version":"1","message":"Unexpected
> >>> exception in stream
> >>>
> >>
> processing.","logger_name":"com.avaya.analytics.filter.ApplicationConfig","thread_name":"analytics-event-filter-StreamThread-1","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> >>> stream-thread [analytics-event-filter-StreamThread-1] Failed to
> >>> rebalance.\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
> >>> by: org.apache.kafka.streams.errors.StreamsException: stream-thread
> >>> [analytics-event-filter-StreamThread-1] task [1_0] Failed to initialize
> >>> task 1_0 due to timeout.\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:923)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:206)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:497)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)\n\tat
> >>> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89)\n\tat
> >>> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:83)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)\n\t...
> >>> 3 common frames omitted\nCaused by:
> >>> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> >>> 60000milliseconds while awaiting InitProducerId\n"}
> >>>
> >>
> >
>

Re: KIP to Gracefully handle timeout exception on kafka streams

Posted by "Matthias J. Sax" <mj...@apache.org>.
Sounds like `default.api.timeout.ms`.

-Matthias

On 12/11/20 8:22 AM, Pushkar Deole wrote:
> Matthias,
> 
> By the way, one more of our service recently encountered this exception:
> can you suggest if this can also be avoided by tuning any specific
> configuration ?
> 
> {"@timestamp":"2020-11-24T13:33:38.617+00:00","@version":"1","message":"Exception
> processing processor thread -
> analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3
> stream - org.apache.kafka.common.errors.TimeoutException: Timeout of
> 60000ms expired before the position for partition engagement-18 could be
> determined","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalStateException:
> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired
> before the position for partition engagement-18 could be determined\n\tat
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:510)\n\tat
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)\n\tat
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)\n\tat
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)\n\tat
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)\n\tat
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)\n\tat
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
> by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
> expired before the position for partition engagement-18 could be
> determined\n"}
> 
> 
> 
> On Sun, Nov 22, 2020 at 12:35 AM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> KIP-572 will only ship in 2.8.0.
>>
>> For the exception you hit, it's `max.block.ms` -- you might also look
>> into `default.api.timeout.ms`.
>>
>> In general, the relevant configs are documented in the JavaDocs of the
>> corresponding client method.
>>
>>
>> -Matthias
>>
>> On 11/20/20 9:11 PM, Pushkar Deole wrote:
>>> Thanks Matthias... We are already on kafka 2.5.0, and
>>> https://issues.apache.org/jira/browse/KAFKA-8803  mentions that these
>> type
>>> of issues are fixed in 2.5.0
>>>
>>> Is KIP-572 planned for 2.7.0 ?
>>>
>>> Also, for timeout and retries, can you provide which parameters should we
>>> configure to higher values for now?
>>>
>>>
>>> On Sat, Nov 21, 2020 at 5:15 AM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>
>>>> Yes, if brokers are upgraded via rolling bounce, and the embedded
>>>> clients are configured with large enough timeouts and retries, they
>>>> should just fail over to running brokers if a single broker is bounced.
>>>>
>>>> If you get a timeout exception, than KafkaStreams dies atm -- we have
>>>> KIP-572 in-flight that will improve the situation by adding one more
>>>> retry layer within KafkaStreams itself. For now, you would need to
>>>> increase the corresponding client timeouts to avoid that the client
>>>> throws a timeout exception.
>>>>
>>>> There is however https://issues.apache.org/jira/browse/KAFKA-8803 that
>>>> you could have hit, too.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 11/18/20 7:05 AM, Pushkar Deole wrote:
>>>>> Matthias,
>>>>>
>>>>> We recently ran into an issue where kafka brokers upgraded (i guess it
>>>> was
>>>>> rolling update) for Aiven business plan 4 to plan 8. This involves
>> change
>>>>> to cpu, memory and storage for each broker.
>>>>>
>>>>> Since this should be rolling upgrade, we expected services to survive,
>>>>> however in one service we saw streams ran into Error with below
>>>> exception:
>>>>> Few questions:
>>>>> 1. If a broker goes down, the kafka streams client should handle
>>>> internally
>>>>> and connect to available broker since we have topic with replicas equal
>>>> to
>>>>> no. of brokers. Is this correct?
>>>>> 2. the below error says timeout expired while awaiting InitProducerId..
>>>>> what does this signify and why would this timeout occur when there will
>>>> be
>>>>> other brokers up and running?
>>>>>
>>>>>
>>>>>
>>>>
>> {"@timestamp":"2020-11-16T13:42:31.110+00:00","@version":"1","message":"Unexpected
>>>>> exception in stream
>>>>>
>>>>
>> processing.","logger_name":"com.avaya.analytics.filter.ApplicationConfig","thread_name":"analytics-event-filter-StreamThread-1","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
>>>>> stream-thread [analytics-event-filter-StreamThread-1] Failed to
>>>>> rebalance.\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
>>>>> by: org.apache.kafka.streams.errors.StreamsException: stream-thread
>>>>> [analytics-event-filter-StreamThread-1] task [1_0] Failed to initialize
>>>>> task 1_0 due to timeout.\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:923)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:206)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:497)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)\n\tat
>>>>> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89)\n\tat
>>>>> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:83)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)\n\t...
>>>>> 3 common frames omitted\nCaused by:
>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
>>>>> 60000milliseconds while awaiting InitProducerId\n"}
>>>>>
>>>>
>>>
>>
>