You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Lukas Gemela (JIRA)" <ji...@apache.org> on 2017/05/15 17:49:04 UTC

[jira] [Comment Edited] (KAFKA-5242) add max_number _of_retries to exponential backoff strategy

    [ https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010974#comment-16010974 ] 

Lukas Gemela edited comment on KAFKA-5242 at 5/15/17 5:48 PM:
--------------------------------------------------------------

[~mjsax] by multiple instances you mean multiple JVMs (nodes) running or multiple instances running within the same jvm process? 

What happened was that by accident we created two instances running within the single JVM process, touching the same data on hard drive:
new KafkaStreams(builder, streamsConfig).start(); 

If this is possible way how to run kafka streams then there is definitely a bug in locking mechanism. I've attached logfiles for this situation (clio_170511), unfortunately only with debug level set to INFO.

ad backoff strategy,  you can do something similar like how it's done in akka lib (cap it with maximal duration): 
http://doc.akka.io/japi/akka/2.4/akka/pattern/Backoff.html 

Thanks!

L.



was (Author: lukas gemela):
[~mjsax] by multiple instances you mean multiple JVMs (nodes) running or multiple instances running within the same jvm process? 

What happened was that by accident we created two instances running within the single JVM process, touching the same data on hard drive:
new KafkaStreams(builder, streamsConfig).start(); 

If this is possible way how to run kafka streams then there is definitely a bug in locking mechanism. I've attached logfiles for this situation, unfortunatelly only with debug level set to INFO.

ad backoff strategy,  you can do something similar like how it's done in akka lib (cap it with maximal duration): 
http://doc.akka.io/japi/akka/2.4/akka/pattern/Backoff.html 

Thanks!

L.


> add max_number _of_retries to exponential backoff strategy
> ----------------------------------------------------------
>
>                 Key: KAFKA-5242
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5242
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Lukas Gemela
>            Priority: Minor
>         Attachments: clio_170511.log
>
>
> From time to time, during relabance we are getting a lot of exceptions saying 
> {code}
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the state directory: /app/db/clio/0_0
> 	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-clients-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [kafka-streams-0.10.2.0.jar!/:?]
> {code}
> (see attached logfile)
> It was actually problem on our side - we ran startStreams() twice and therefore we had two threads touching the same folder structure. 
> But what I've noticed, the backoff strategy in StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after 20 iterations it takes 6hours until the next attempt to start a task. 
> I've noticed latest code contains check for rebalanceTimeoutMs, but that still does not solve the problem especially in case MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka streams just hangs up indefinitely.
> I would personally make that backoffstrategy a bit more configurable with a number of retries that if it exceed a configured value it propagates the exception as any other exception to custom client exception handler.
> (I can provide a patch)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)