You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2017/05/15 17:25:04 UTC

[jira] [Commented] (KAFKA-5242) add max_number _of_retries to exponential backoff strategy

    [ https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010936#comment-16010936 ] 

Matthias J. Sax commented on KAFKA-5242:
----------------------------------------

We fixed a couple of state directory lock issues in {{0.10.2.1}} -- thus I am wondering if is it already fixed there?

About the retry logic: the fact that you did create multiple {{KafkaStreams}} instances and started both should not have any influence on the behavior and should not cause the issue (if it hangs forever, it's a bug in lock management and should be independent on starting one or two instances -- maybe multiple instances expose the bug with higher probability, but it should not be the root cause). You should only get more parallel running instances, and load should be redistributed over more threads. We do try infinitely atm, as we know that the lock will be release eventually (as long as there is no bug).

We try to keep the number of config values as small as possible. Thus, I am wondering if if might be sufficient to hard code a max retry time or count? We might also put an upper bound on back-off time -- the current strategy seems to be too aggressive. WDYT?

> add max_number _of_retries to exponential backoff strategy
> ----------------------------------------------------------
>
>                 Key: KAFKA-5242
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5242
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Lukas Gemela
>            Priority: Minor
>
> From time to time, during relabance we are getting a lot of exceptions saying 
> {code}
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the state directory: /app/db/clio/0_0
> 	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [kafka-streams-0.10.2.0.jar!/:?]
> {code}
> (see attached logfile)
> It was actually problem on our side - we ran startStreams() twice and therefore we had two threads touching the same folder structure. 
> But what I've noticed, the backoff strategy in StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after 20 iterations it takes 6hours until the next attempt to start a task. 
> I've noticed latest code contains check for rebalanceTimeoutMs, but that still does not solve the problem especially in case MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka streams just hangs up indefinitely.
> I would personally make that backoffstrategy a bit more configurable with a number of retries that if it exceed a configured value it propagates the exception as any other exception to custom client exception handler.
> (I can provide a patch)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)