You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sameer Kumar <sa...@gmail.com> on 2017/05/03 13:04:33 UTC

Kafka Streams Failed to rebalance error

Hi,



I ran two nodes in my streams compute cluster, they were running fine for
few hours before outputting with failure to rebalance errors.


I couldnt understand why this happened but I saw one strange behaviour...

at 16:53 on node1, I saw "Failed to lock the state directory" error, this
might have caused the partitions to relocate and hence the error.



I am attaching detailed logs for both the nodes, please see if you can help.



Some of the logs for quick reference are these.



2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread
StreamThread-2

org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-2] Failed to rebalance

                at org.apache.kafka.streams.processor.internals.
StreamThread.runLoop(StreamThread.java:612)

                at org.apache.kafka.streams.processor.internals.
StreamThread.run(StreamThread.java:368)

Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-2] failed to suspend stream tasks

                at org.apache.kafka.streams.processor.internals.
StreamThread.suspendTasksAndState(StreamThread.java:488)

                at org.apache.kafka.streams.processor.internals.
StreamThread.access$1200(StreamThread.java:69)

                at org.apache.kafka.streams.processor.internals.
StreamThread$1.onPartitionsRevoked(StreamThread.java:259)

                at org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)

                at org.apache.kafka.clients.consumer.internals.
AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)

                at org.apache.kafka.clients.consumer.internals.
AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)

                at org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator.poll(ConsumerCoordinator.java:286)

                at org.apache.kafka.clients.consumer.KafkaConsumer.
pollOnce(KafkaConsumer.java:1030)

                at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
KafkaConsumer.java:995)

                at org.apache.kafka.streams.processor.internals.
StreamThread.runLoop(StreamThread.java:582)

                ... 1 more

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit
cannot be completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.

                at org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)

                at org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)

                at org.apache.kafka.clients.consumer.KafkaConsumer.
commitSync(KafkaConsumer.java:1125)

                at org.apache.kafka.streams.processor.internals.
StreamTask.commitOffsets(StreamTask.java:296)

                at org.apache.kafka.streams.processor.internals.
StreamThread$3.apply(StreamThread.java:535)

                at org.apache.kafka.streams.processor.internals.
StreamThread.performOnAllTasks(StreamThread.java:503)

                at org.apache.kafka.streams.processor.internals.
StreamThread.commitOffsets(StreamThread.java:531)

                at org.apache.kafka.streams.processor.internals.
StreamThread.suspendTasksAndState(StreamThread.java:480)

                ... 10 more



2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task 1_38.
Will retry.

org.apache.kafka.streams.errors.LockException: task [1_38] Failed to lock
the state directory: /data/streampoc/LIC2-5/1_38

                at org.apache.kafka.streams.processor.internals.
ProcessorStateManager.<init>(ProcessorStateManager.java:102)

                at org.apache.kafka.streams.processor.internals.
AbstractTask.<init>(AbstractTask.java:73)

                at org.apache.kafka.streams.processor.internals.
StreamTask.<init>(StreamTask.java:108)

                at org.apache.kafka.streams.processor.internals.
StreamThread.createStreamTask(StreamThread.java:834)

                at org.apache.kafka.streams.processor.internals.
StreamThread$TaskCreator.createTask(StreamThread.java:1207)

                at org.apache.kafka.streams.processor.internals.
StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)

                at org.apache.kafka.streams.processor.internals.
StreamThread.addStreamTasks(StreamThread.java:937)

                at org.apache.kafka.streams.processor.internals.
StreamThread.access$500(StreamThread.java:69)

                at org.apache.kafka.streams.processor.internals.
StreamThread$1.onPartitionsAssigned(StreamThread.java:236)


Regards,

-Sameer.

Re: Kafka Streams Failed to rebalance error

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

I `CommitFailedException` can still occur if an instance misses a
rebalance. I thinks, this is two different problems.

Having said this, Streams should recover from `CommitFailedException`
automatically by triggering another rebalance afterwards.

Nevertheless, we know that there is an issue with rebalancing, as if
state recreation takes long, an rebalancing instance might miss another
rebalance... This is one of the top priority things we want to work on,
after 0.11 was released.


-Matthias

On 6/9/17 10:23 AM, João Peixoto wrote:
> I see your point Eno, but truth is, on my real app I am getting
> "CommitFailedException", even though I did not change "max.poll.interval.ms"
> and it remains at Integer.MAX_VALUE.
> 
> I'm further investigating the origin of that exception. My current working
> theory is that if a customer processor throws a runtime exception at the
> wrong time the above may happen.
> 
> 
> 
> 
> On Fri, Jun 9, 2017 at 9:34 AM Eno Thereska <en...@gmail.com> wrote:
> 
>> Even without a state store the tasks themselves will get rebalanced.
>>
>> So definitely you'll trigger the problem with the 1.2.3. steps you
>> describe and that is confirmed. The reason we increased "
>> max.poll.interval.ms" to basically infinite is to just avoid this problem.
>>
>> Eno
>>> On 9 Jun 2017, at 07:40, João Peixoto <jo...@gmail.com> wrote:
>>>
>>> I am now able to consistently reproduce this issue with a dummy project.
>>>
>>> 1. Set "max.poll.interval.ms" to a low value
>>> 2. Have the pipeline take longer than the interval above
>>> 3. Profit
>>>
>>> This happens every single time and never recovers.
>>> I simulated the delay by adding a breakpoint on my IDE on a sink
>> "foreach"
>>> step and then proceeding after the above interval had elapsed.
>>>
>>> Any advice on how to work around this using 0.10.2.1 would be greatly
>>> appreciated.
>>> Hope it helps
>>>
>>> On Wed, Jun 7, 2017 at 10:19 PM João Peixoto <jo...@gmail.com>
>>> wrote:
>>>
>>>> But my stream definition does not have a state store at all, Rocksdb or
>> in
>>>> memory... That's the most concerning part...
>>>> On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal <sj...@gmail.com>
>> wrote:
>>>>
>>>>> One instance with 10 threads may cause rocksdb issues.
>>>>> What is the RAM you have?
>>>>>
>>>>> Also check CPU wait time. Many rocks db instances on one machine
>> (depends
>>>>> upon number of partitions) may cause lot of disk i/o causing wait
>> times to
>>>>> increase and hence slowing down the message processing causing frequent
>>>>> rebalance's.
>>>>>
>>>>> Also what is your topic partitions. My experience is having one thread
>> per
>>>>> partition is ideal.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> There is one instance with 10 threads.
>>>>>>
>>>>>> On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wa...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> João,
>>>>>>>
>>>>>>> Do you also have multiple running instances in parallel, and how many
>>>>>>> threads are your running within each instance?
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <
>> joao.hartimer@gmail.com
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Eno before I do so I just want to be sure this would not be a
>>>>>> duplicate.
>>>>>>> I
>>>>>>>> just found the following issues:
>>>>>>>>
>>>>>>>> * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
>>>>>>> fixed
>>>>>>>> on 0.11.0.0/0.10.2.2 (both not released afaik)
>>>>>>>> * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
>>>>>>> progress
>>>>>>>>
>>>>>>>> On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <eno.thereska@gmail.com
>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi there,
>>>>>>>>>
>>>>>>>>> This might be a bug, would you mind opening a JIRA (copy-pasting
>>>>>> below
>>>>>>> is
>>>>>>>>> sufficient).
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Eno
>>>>>>>>>> On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> I'm using Kafka Streams 0.10.2.1 and I still see this error
>>>>>>>>>>
>>>>>>>>>> 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
>>>>>>>>>> o.a.k.s.p.internals.StreamThread         : Could not create task
>>>>>>> 0_31.
>>>>>>>>> Will
>>>>>>>>>> retry.
>>>>>>>>>>
>>>>>>>>>> org.apache.kafka.streams.errors.LockException: task [0_31]
>>>>> Failed
>>>>>> to
>>>>>>>> lock
>>>>>>>>>> the state directory for task 0_31
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>> ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>>>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
>>>>>>>> AbstractTask.java:73)
>>>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>> StreamTask.<init>(StreamTask.java:108)
>>>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>> StreamThread.createStreamTask(StreamThread.java:864)
>>>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>> StreamThread$TaskCreator.
>>>>>>>> createTask(StreamThread.java:1237)
>>>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread$
>>>>>>>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>>>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>> StreamThread.addStreamTasks(StreamThread.java:967)
>>>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>> StreamThread.access$600(
>>>>>>>> StreamThread.java:69)
>>>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread$1.
>>>>>>>> onPartitionsAssigned(StreamThread.java:234)
>>>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>>>>>>>> onJoinComplete(ConsumerCoordinator.java:259)
>>>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>>>>>>> joinGroupIfNeeded(AbstractCoordinator.java:352)
>>>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>>>>>>> ensureActiveGroup(AbstractCoordinator.java:303)
>>>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.clients.consumer.internals.
>>>>>> ConsumerCoordinator.poll(
>>>>>>>> ConsumerCoordinator.java:290)
>>>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>>>>>>>> pollOnce(KafkaConsumer.java:1029)
>>>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>>>>>>> KafkaConsumer.java:995)
>>>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>>>>> StreamThread.java:592)
>>>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>> StreamThread.run(StreamThread.java:361)
>>>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> It has been printing it for hours now, so it does not recover at
>>>>>> all.
>>>>>>>>>> The most worrying thing is that this stream definition does not
>>>>>> even
>>>>>>>> use
>>>>>>>>>> state stores, it literally looks like this:
>>>>>>>>>>
>>>>>>>>>> KStreamBuilder builder = new KStreamBuilder();
>>>>>>>>>>       KStream<byte[], Message> kStream =
>>>>>>>>>> builder.stream(appOptions.getInput().getTopic());
>>>>>>>>>>       kStream.process(() -> processor);
>>>>>>>>>>       new KafkaStreams(builder, streamsConfiguration);
>>>>>>>>>>
>>>>>>>>>> The "processor" does its thing and calls "context().commit()"
>>>>> when
>>>>>>>> done.
>>>>>>>>>> That's it. Looking at the actual machine running the instance,
>>>>> the
>>>>>>>>> folders
>>>>>>>>>> under /tmp/kafka-streams/<stream name>/ only have a .lock file.
>>>>>>>>>>
>>>>>>>>>> This seems to have been bootstrapped by the exception:
>>>>>>>>>>
>>>>>>>>>> org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>>>>>>> cannot be
>>>>>>>>>> completed since the group has already rebalanced and assigned
>>>>> the
>>>>>>>>>> partitions to another member. This means that the time between
>>>>>>>> subsequent
>>>>>>>>>> calls to poll() was longer than the configured
>>>>>> max.poll.interval.ms,
>>>>>>>>> which
>>>>>>>>>> typically implies that the poll loop is spending too much time
>>>>>>> message
>>>>>>>>>> processing. You can address this either by increasing the
>>>>> session
>>>>>>>> timeout
>>>>>>>>>> or by reducing the maximum size of batches returned in poll()
>>>>> with
>>>>>>>>>> max.poll.records.
>>>>>>>>>>
>>>>>>>>>> We are addressing the latter by reducing "max.poll.records" and
>>>>>>>>> increasing "
>>>>>>>>>> commit.interval.ms", nonetheless, shouldn't Kafka Streams not
>>>>>> worry
>>>>>>>>> about
>>>>>>>>>> state dirs if there are no state stores? Since it doesn't seem
>>>>> to
>>>>>> do
>>>>>>> so
>>>>>>>>>> automatically, can I configured it somehow to achieve this end?
>>>>>>>>>>
>>>>>>>>>> Additionally, what could lead to it not being able to recover?
>>>>>>>>>>
>>>>>>>>>> On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
>>>>>>> matthias@confluent.io
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Great! :)
>>>>>>>>>>>
>>>>>>>>>>> On 5/16/17 2:31 AM, Sameer Kumar wrote:
>>>>>>>>>>>> I see now that my Kafka cluster is very stable, and these
>>>>> errors
>>>>>>> dont
>>>>>>>>>>> come
>>>>>>>>>>>> now.
>>>>>>>>>>>>
>>>>>>>>>>>> -Sameer.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
>>>>>>> sam.kum.work@gmail.com
>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, I have upgraded my cluster and client both to version
>>>>>> 10.2.1
>>>>>>>> and
>>>>>>>>>>>>> currently monitoring the situation.
>>>>>>>>>>>>> Will report back in case I find any errors. Thanks for the
>>>>> help
>>>>>>>>> though.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Sameer.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
>>>>>>>>> matthias@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Did you see Eno's reply?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Please try out Streams 0.10.2.1 -- this should be fixed
>>>>> there.
>>>>>> If
>>>>>>>>> not,
>>>>>>>>>>>>>> please report back.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would also recommend to subscribe to the list. It's
>>>>>>> self-service
>>>>>>>>>>>>>> http://kafka.apache.org/contact
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
>>>>>>>>>>>>>>> My brokers are on version 10.1.0 and my clients are on
>>>>> version
>>>>>>>>> 10.2.0.
>>>>>>>>>>>>>>> Also, do a reply to all, I am currently not subscribed to
>>>>> the
>>>>>>>> list.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Sameer.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
>>>>>>>>> sam.kum.work@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I ran two nodes in my streams compute cluster, they were
>>>>>>> running
>>>>>>>>> fine
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> few hours before outputting with failure to rebalance
>>>>> errors.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I couldnt understand why this happened but I saw one
>>>>> strange
>>>>>>>>>>>>>> behaviour...
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at 16:53 on node1, I saw "Failed to lock the state
>>>>> directory"
>>>>>>>>> error,
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> might have caused the partitions to relocate and hence the
>>>>>>> error.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am attaching detailed logs for both the nodes, please
>>>>> see
>>>>>> if
>>>>>>>> you
>>>>>>>>>>> can
>>>>>>>>>>>>>>>> help.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Some of the logs for quick reference are these.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception
>>>>> caught
>>>>>> in
>>>>>>>>> thread
>>>>>>>>>>>>>>>> StreamThread-2
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> org.apache.kafka.streams.errors.StreamsException:
>>>>>> stream-thread
>>>>>>>>>>>>>>>> [StreamThread-2] Failed to rebalance
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Caused by:
>>>>> org.apache.kafka.streams.errors.StreamsException:
>>>>>>>>>>>>>>>> stream-thread [StreamThread-2] failed to suspend stream
>>>>> tasks
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>>>>>>>>>>>>> d.java:488)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:
>>>>>> 69)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
>>>>>>>>>>>>>> ad.java:259)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>>>>
>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
>>>>>>>>>>>>>>>> dinator.java:396)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>>>>
>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
>>>>>>>>>>>>>>>> Coordinator.java:329)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>>>>
>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
>>>>>>>>>>>>>>>> Coordinator.java:303)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               ... 1 more
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Caused by: org.apache.kafka.clients.consumer.
>>>>>>>> CommitFailedException:
>>>>>>>>>>>>>>>> Commit cannot be completed since the group has already
>>>>>>> rebalanced
>>>>>>>>> and
>>>>>>>>>>>>>>>> assigned the partitions to another member. This means that
>>>>>> the
>>>>>>>> time
>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>> subsequent calls to poll() was longer than the configured
>>>>>>>>>>>>>>>> max.poll.interval.ms, which typically implies that the
>>>>> poll
>>>>>>> loop
>>>>>>>>> is
>>>>>>>>>>>>>>>> spending too much time message processing. You can address
>>>>>> this
>>>>>>>>>>> either
>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>> increasing the session timeout or by reducing the maximum
>>>>>> size
>>>>>>> of
>>>>>>>>>>>>>> batches
>>>>>>>>>>>>>>>> returned in poll() with max.poll.records.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>>>>
>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
>>>>>>>>>>>>>>>> nsumerCoordinator.java:698)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>>>>
>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
>>>>>>>>>>>>>>>> Coordinator.java:577)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>>>>>>>>>>>>> d.java:480)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               ... 10 more
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not
>>>>>> create
>>>>>>>> task
>>>>>>>>>>>>>> 1_38.
>>>>>>>>>>>>>>>> Will retry.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
>>>>>>>> Failed
>>>>>>>>> to
>>>>>>>>>>>>>> lock
>>>>>>>>>>>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
>>>>>>>>>>>>>>>> nager.java:102)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
>>>>>>>> java:834)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
>>>>>>>>>>>>>> ead.java:1207)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
>>>>>>>>>>>>>>>> koff(StreamThread.java:1180)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
>>>>>>>> java:937)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>>>>
>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
>>>>>>>>>>>>>> ead.java:236)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Sameer.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>>
> 


Re: Kafka Streams Failed to rebalance error

Posted by João Peixoto <jo...@gmail.com>.
I see your point Eno, but truth is, on my real app I am getting
"CommitFailedException", even though I did not change "max.poll.interval.ms"
and it remains at Integer.MAX_VALUE.

I'm further investigating the origin of that exception. My current working
theory is that if a customer processor throws a runtime exception at the
wrong time the above may happen.




On Fri, Jun 9, 2017 at 9:34 AM Eno Thereska <en...@gmail.com> wrote:

> Even without a state store the tasks themselves will get rebalanced.
>
> So definitely you'll trigger the problem with the 1.2.3. steps you
> describe and that is confirmed. The reason we increased "
> max.poll.interval.ms" to basically infinite is to just avoid this problem.
>
> Eno
> > On 9 Jun 2017, at 07:40, João Peixoto <jo...@gmail.com> wrote:
> >
> > I am now able to consistently reproduce this issue with a dummy project.
> >
> > 1. Set "max.poll.interval.ms" to a low value
> > 2. Have the pipeline take longer than the interval above
> > 3. Profit
> >
> > This happens every single time and never recovers.
> > I simulated the delay by adding a breakpoint on my IDE on a sink
> "foreach"
> > step and then proceeding after the above interval had elapsed.
> >
> > Any advice on how to work around this using 0.10.2.1 would be greatly
> > appreciated.
> > Hope it helps
> >
> > On Wed, Jun 7, 2017 at 10:19 PM João Peixoto <jo...@gmail.com>
> > wrote:
> >
> >> But my stream definition does not have a state store at all, Rocksdb or
> in
> >> memory... That's the most concerning part...
> >> On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal <sj...@gmail.com>
> wrote:
> >>
> >>> One instance with 10 threads may cause rocksdb issues.
> >>> What is the RAM you have?
> >>>
> >>> Also check CPU wait time. Many rocks db instances on one machine
> (depends
> >>> upon number of partitions) may cause lot of disk i/o causing wait
> times to
> >>> increase and hence slowing down the message processing causing frequent
> >>> rebalance's.
> >>>
> >>> Also what is your topic partitions. My experience is having one thread
> per
> >>> partition is ideal.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <jo...@gmail.com>
> >>> wrote:
> >>>
> >>>> There is one instance with 10 threads.
> >>>>
> >>>> On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> João,
> >>>>>
> >>>>> Do you also have multiple running instances in parallel, and how many
> >>>>> threads are your running within each instance?
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <
> joao.hartimer@gmail.com
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Eno before I do so I just want to be sure this would not be a
> >>>> duplicate.
> >>>>> I
> >>>>>> just found the following issues:
> >>>>>>
> >>>>>> * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
> >>>>> fixed
> >>>>>> on 0.11.0.0/0.10.2.2 (both not released afaik)
> >>>>>> * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
> >>>>> progress
> >>>>>>
> >>>>>> On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <eno.thereska@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi there,
> >>>>>>>
> >>>>>>> This might be a bug, would you mind opening a JIRA (copy-pasting
> >>>> below
> >>>>> is
> >>>>>>> sufficient).
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Eno
> >>>>>>>> On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>> I'm using Kafka Streams 0.10.2.1 and I still see this error
> >>>>>>>>
> >>>>>>>> 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> >>>>>>>> o.a.k.s.p.internals.StreamThread         : Could not create task
> >>>>> 0_31.
> >>>>>>> Will
> >>>>>>>> retry.
> >>>>>>>>
> >>>>>>>> org.apache.kafka.streams.errors.LockException: task [0_31]
> >>> Failed
> >>>> to
> >>>>>> lock
> >>>>>>>> the state directory for task 0_31
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> >>>>>> AbstractTask.java:73)
> >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.<init>(StreamTask.java:108)
> >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> StreamThread.createStreamTask(StreamThread.java:864)
> >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.
> >>>> StreamThread$TaskCreator.
> >>>>>> createTask(StreamThread.java:1237)
> >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.StreamThread$
> >>>>>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> StreamThread.addStreamTasks(StreamThread.java:967)
> >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.access$600(
> >>>>>> StreamThread.java:69)
> >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.StreamThread$1.
> >>>>>> onPartitionsAssigned(StreamThread.java:234)
> >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >>>>>> onJoinComplete(ConsumerCoordinator.java:259)
> >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >>>>>> joinGroupIfNeeded(AbstractCoordinator.java:352)
> >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >>>>>> ensureActiveGroup(AbstractCoordinator.java:303)
> >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.clients.consumer.internals.
> >>>> ConsumerCoordinator.poll(
> >>>>>> ConsumerCoordinator.java:290)
> >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.
> >>>>>> pollOnce(KafkaConsumer.java:1029)
> >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >>>>>> KafkaConsumer.java:995)
> >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>>>>> StreamThread.java:592)
> >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>> at
> >>>>>>>>
> >>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> StreamThread.run(StreamThread.java:361)
> >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> It has been printing it for hours now, so it does not recover at
> >>>> all.
> >>>>>>>> The most worrying thing is that this stream definition does not
> >>>> even
> >>>>>> use
> >>>>>>>> state stores, it literally looks like this:
> >>>>>>>>
> >>>>>>>> KStreamBuilder builder = new KStreamBuilder();
> >>>>>>>>       KStream<byte[], Message> kStream =
> >>>>>>>> builder.stream(appOptions.getInput().getTopic());
> >>>>>>>>       kStream.process(() -> processor);
> >>>>>>>>       new KafkaStreams(builder, streamsConfiguration);
> >>>>>>>>
> >>>>>>>> The "processor" does its thing and calls "context().commit()"
> >>> when
> >>>>>> done.
> >>>>>>>> That's it. Looking at the actual machine running the instance,
> >>> the
> >>>>>>> folders
> >>>>>>>> under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> >>>>>>>>
> >>>>>>>> This seems to have been bootstrapped by the exception:
> >>>>>>>>
> >>>>>>>> org.apache.kafka.clients.consumer.CommitFailedException: Commit
> >>>>>> cannot be
> >>>>>>>> completed since the group has already rebalanced and assigned
> >>> the
> >>>>>>>> partitions to another member. This means that the time between
> >>>>>> subsequent
> >>>>>>>> calls to poll() was longer than the configured
> >>>> max.poll.interval.ms,
> >>>>>>> which
> >>>>>>>> typically implies that the poll loop is spending too much time
> >>>>> message
> >>>>>>>> processing. You can address this either by increasing the
> >>> session
> >>>>>> timeout
> >>>>>>>> or by reducing the maximum size of batches returned in poll()
> >>> with
> >>>>>>>> max.poll.records.
> >>>>>>>>
> >>>>>>>> We are addressing the latter by reducing "max.poll.records" and
> >>>>>>> increasing "
> >>>>>>>> commit.interval.ms", nonetheless, shouldn't Kafka Streams not
> >>>> worry
> >>>>>>> about
> >>>>>>>> state dirs if there are no state stores? Since it doesn't seem
> >>> to
> >>>> do
> >>>>> so
> >>>>>>>> automatically, can I configured it somehow to achieve this end?
> >>>>>>>>
> >>>>>>>> Additionally, what could lead to it not being able to recover?
> >>>>>>>>
> >>>>>>>> On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
> >>>>> matthias@confluent.io
> >>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Great! :)
> >>>>>>>>>
> >>>>>>>>> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> >>>>>>>>>> I see now that my Kafka cluster is very stable, and these
> >>> errors
> >>>>> dont
> >>>>>>>>> come
> >>>>>>>>>> now.
> >>>>>>>>>>
> >>>>>>>>>> -Sameer.
> >>>>>>>>>>
> >>>>>>>>>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
> >>>>> sam.kum.work@gmail.com
> >>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Yes, I have upgraded my cluster and client both to version
> >>>> 10.2.1
> >>>>>> and
> >>>>>>>>>>> currently monitoring the situation.
> >>>>>>>>>>> Will report back in case I find any errors. Thanks for the
> >>> help
> >>>>>>> though.
> >>>>>>>>>>>
> >>>>>>>>>>> -Sameer.
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
> >>>>>>> matthias@confluent.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Did you see Eno's reply?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Please try out Streams 0.10.2.1 -- this should be fixed
> >>> there.
> >>>> If
> >>>>>>> not,
> >>>>>>>>>>>> please report back.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I would also recommend to subscribe to the list. It's
> >>>>> self-service
> >>>>>>>>>>>> http://kafka.apache.org/contact
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> >>>>>>>>>>>>> My brokers are on version 10.1.0 and my clients are on
> >>> version
> >>>>>>> 10.2.0.
> >>>>>>>>>>>>> Also, do a reply to all, I am currently not subscribed to
> >>> the
> >>>>>> list.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Sameer.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
> >>>>>>> sam.kum.work@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I ran two nodes in my streams compute cluster, they were
> >>>>> running
> >>>>>>> fine
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>> few hours before outputting with failure to rebalance
> >>> errors.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I couldnt understand why this happened but I saw one
> >>> strange
> >>>>>>>>>>>> behaviour...
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> at 16:53 on node1, I saw "Failed to lock the state
> >>> directory"
> >>>>>>> error,
> >>>>>>>>>>>> this
> >>>>>>>>>>>>>> might have caused the partitions to relocate and hence the
> >>>>> error.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I am attaching detailed logs for both the nodes, please
> >>> see
> >>>> if
> >>>>>> you
> >>>>>>>>> can
> >>>>>>>>>>>>>> help.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Some of the logs for quick reference are these.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception
> >>> caught
> >>>> in
> >>>>>>> thread
> >>>>>>>>>>>>>> StreamThread-2
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> org.apache.kafka.streams.errors.StreamsException:
> >>>> stream-thread
> >>>>>>>>>>>>>> [StreamThread-2] Failed to rebalance
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Caused by:
> >>> org.apache.kafka.streams.errors.StreamsException:
> >>>>>>>>>>>>>> stream-thread [StreamThread-2] failed to suspend stream
> >>> tasks
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> >>>>>>>>>>>> d.java:488)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:
> >>>> 69)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> >>>>>>>>>>>> ad.java:259)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
> >>>>>>>>>>>>>>
> >>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> >>>>>>>>>>>>>> dinator.java:396)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
> >>>>>>>>>>>>>>
> >>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> >>>>>>>>>>>>>> Coordinator.java:329)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
> >>>>>>>>>>>>>>
> >>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> >>>>>>>>>>>>>> Coordinator.java:303)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
> >>>>>>>>>>>>>>
> >>>>>>>
> >>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
> >>>>>>>>>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
> >>>>>>>>>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               ... 1 more
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Caused by: org.apache.kafka.clients.consumer.
> >>>>>> CommitFailedException:
> >>>>>>>>>>>>>> Commit cannot be completed since the group has already
> >>>>> rebalanced
> >>>>>>> and
> >>>>>>>>>>>>>> assigned the partitions to another member. This means that
> >>>> the
> >>>>>> time
> >>>>>>>>>>>> between
> >>>>>>>>>>>>>> subsequent calls to poll() was longer than the configured
> >>>>>>>>>>>>>> max.poll.interval.ms, which typically implies that the
> >>> poll
> >>>>> loop
> >>>>>>> is
> >>>>>>>>>>>>>> spending too much time message processing. You can address
> >>>> this
> >>>>>>>>> either
> >>>>>>>>>>>> by
> >>>>>>>>>>>>>> increasing the session timeout or by reducing the maximum
> >>>> size
> >>>>> of
> >>>>>>>>>>>> batches
> >>>>>>>>>>>>>> returned in poll() with max.poll.records.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
> >>>>>>>>>>>>>>
> >>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> >>>>>>>>>>>>>> nsumerCoordinator.java:698)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
> >>>>>>>>>>>>>>
> >>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> >>>>>>>>>>>>>> Coordinator.java:577)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
> >>>>>>>>>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>>>>>>
> >>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> >>>>>>>>>>>> d.java:480)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               ... 10 more
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not
> >>>> create
> >>>>>> task
> >>>>>>>>>>>> 1_38.
> >>>>>>>>>>>>>> Will retry.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
> >>>>>> Failed
> >>>>>>> to
> >>>>>>>>>>>> lock
> >>>>>>>>>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> >>>>>>>>>>>>>> nager.java:102)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
> >>>>>> java:834)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> >>>>>>>>>>>> ead.java:1207)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> >>>>>>>>>>>>>> koff(StreamThread.java:1180)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
> >>>>>> java:937)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
> >>>>>>>>>>>>>>
> >>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> >>>>>>>>>>>> ead.java:236)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Sameer.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: Kafka Streams Failed to rebalance error

Posted by Eno Thereska <en...@gmail.com>.
Even without a state store the tasks themselves will get rebalanced.

So definitely you'll trigger the problem with the 1.2.3. steps you describe and that is confirmed. The reason we increased "max.poll.interval.ms" to basically infinite is to just avoid this problem.

Eno
> On 9 Jun 2017, at 07:40, João Peixoto <jo...@gmail.com> wrote:
> 
> I am now able to consistently reproduce this issue with a dummy project.
> 
> 1. Set "max.poll.interval.ms" to a low value
> 2. Have the pipeline take longer than the interval above
> 3. Profit
> 
> This happens every single time and never recovers.
> I simulated the delay by adding a breakpoint on my IDE on a sink "foreach"
> step and then proceeding after the above interval had elapsed.
> 
> Any advice on how to work around this using 0.10.2.1 would be greatly
> appreciated.
> Hope it helps
> 
> On Wed, Jun 7, 2017 at 10:19 PM João Peixoto <jo...@gmail.com>
> wrote:
> 
>> But my stream definition does not have a state store at all, Rocksdb or in
>> memory... That's the most concerning part...
>> On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal <sj...@gmail.com> wrote:
>> 
>>> One instance with 10 threads may cause rocksdb issues.
>>> What is the RAM you have?
>>> 
>>> Also check CPU wait time. Many rocks db instances on one machine (depends
>>> upon number of partitions) may cause lot of disk i/o causing wait times to
>>> increase and hence slowing down the message processing causing frequent
>>> rebalance's.
>>> 
>>> Also what is your topic partitions. My experience is having one thread per
>>> partition is ideal.
>>> 
>>> Thanks
>>> Sachin
>>> 
>>> 
>>> On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <jo...@gmail.com>
>>> wrote:
>>> 
>>>> There is one instance with 10 threads.
>>>> 
>>>> On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>> 
>>>>> João,
>>>>> 
>>>>> Do you also have multiple running instances in parallel, and how many
>>>>> threads are your running within each instance?
>>>>> 
>>>>> Guozhang
>>>>> 
>>>>> 
>>>>> On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <joao.hartimer@gmail.com
>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Eno before I do so I just want to be sure this would not be a
>>>> duplicate.
>>>>> I
>>>>>> just found the following issues:
>>>>>> 
>>>>>> * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
>>>>> fixed
>>>>>> on 0.11.0.0/0.10.2.2 (both not released afaik)
>>>>>> * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
>>>>> progress
>>>>>> 
>>>>>> On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <eno.thereska@gmail.com
>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi there,
>>>>>>> 
>>>>>>> This might be a bug, would you mind opening a JIRA (copy-pasting
>>>> below
>>>>> is
>>>>>>> sufficient).
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Eno
>>>>>>>> On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>> I'm using Kafka Streams 0.10.2.1 and I still see this error
>>>>>>>> 
>>>>>>>> 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
>>>>>>>> o.a.k.s.p.internals.StreamThread         : Could not create task
>>>>> 0_31.
>>>>>>> Will
>>>>>>>> retry.
>>>>>>>> 
>>>>>>>> org.apache.kafka.streams.errors.LockException: task [0_31]
>>> Failed
>>>> to
>>>>>> lock
>>>>>>>> the state directory for task 0_31
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>> ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
>>>>>> AbstractTask.java:73)
>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>> StreamTask.<init>(StreamTask.java:108)
>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>> StreamThread.createStreamTask(StreamThread.java:864)
>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.
>>>> StreamThread$TaskCreator.
>>>>>> createTask(StreamThread.java:1237)
>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread$
>>>>>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>> StreamThread.addStreamTasks(StreamThread.java:967)
>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.
>>>> StreamThread.access$600(
>>>>>> StreamThread.java:69)
>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread$1.
>>>>>> onPartitionsAssigned(StreamThread.java:234)
>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>>>>>> onJoinComplete(ConsumerCoordinator.java:259)
>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>>>>> joinGroupIfNeeded(AbstractCoordinator.java:352)
>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>>>>> ensureActiveGroup(AbstractCoordinator.java:303)
>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.poll(
>>>>>> ConsumerCoordinator.java:290)
>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>>>>>> pollOnce(KafkaConsumer.java:1029)
>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>>>>> KafkaConsumer.java:995)
>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>>> StreamThread.java:592)
>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> at
>>>>>>>> 
>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>> StreamThread.run(StreamThread.java:361)
>>>>>>>> [kafka-streams-0.10.2.1.jar!/:na]
>>>>>>>> 
>>>>>>>> 
>>>>>>>> It has been printing it for hours now, so it does not recover at
>>>> all.
>>>>>>>> The most worrying thing is that this stream definition does not
>>>> even
>>>>>> use
>>>>>>>> state stores, it literally looks like this:
>>>>>>>> 
>>>>>>>> KStreamBuilder builder = new KStreamBuilder();
>>>>>>>>       KStream<byte[], Message> kStream =
>>>>>>>> builder.stream(appOptions.getInput().getTopic());
>>>>>>>>       kStream.process(() -> processor);
>>>>>>>>       new KafkaStreams(builder, streamsConfiguration);
>>>>>>>> 
>>>>>>>> The "processor" does its thing and calls "context().commit()"
>>> when
>>>>>> done.
>>>>>>>> That's it. Looking at the actual machine running the instance,
>>> the
>>>>>>> folders
>>>>>>>> under /tmp/kafka-streams/<stream name>/ only have a .lock file.
>>>>>>>> 
>>>>>>>> This seems to have been bootstrapped by the exception:
>>>>>>>> 
>>>>>>>> org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>>>>> cannot be
>>>>>>>> completed since the group has already rebalanced and assigned
>>> the
>>>>>>>> partitions to another member. This means that the time between
>>>>>> subsequent
>>>>>>>> calls to poll() was longer than the configured
>>>> max.poll.interval.ms,
>>>>>>> which
>>>>>>>> typically implies that the poll loop is spending too much time
>>>>> message
>>>>>>>> processing. You can address this either by increasing the
>>> session
>>>>>> timeout
>>>>>>>> or by reducing the maximum size of batches returned in poll()
>>> with
>>>>>>>> max.poll.records.
>>>>>>>> 
>>>>>>>> We are addressing the latter by reducing "max.poll.records" and
>>>>>>> increasing "
>>>>>>>> commit.interval.ms", nonetheless, shouldn't Kafka Streams not
>>>> worry
>>>>>>> about
>>>>>>>> state dirs if there are no state stores? Since it doesn't seem
>>> to
>>>> do
>>>>> so
>>>>>>>> automatically, can I configured it somehow to achieve this end?
>>>>>>>> 
>>>>>>>> Additionally, what could lead to it not being able to recover?
>>>>>>>> 
>>>>>>>> On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
>>>>> matthias@confluent.io
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Great! :)
>>>>>>>>> 
>>>>>>>>> On 5/16/17 2:31 AM, Sameer Kumar wrote:
>>>>>>>>>> I see now that my Kafka cluster is very stable, and these
>>> errors
>>>>> dont
>>>>>>>>> come
>>>>>>>>>> now.
>>>>>>>>>> 
>>>>>>>>>> -Sameer.
>>>>>>>>>> 
>>>>>>>>>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
>>>>> sam.kum.work@gmail.com
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Yes, I have upgraded my cluster and client both to version
>>>> 10.2.1
>>>>>> and
>>>>>>>>>>> currently monitoring the situation.
>>>>>>>>>>> Will report back in case I find any errors. Thanks for the
>>> help
>>>>>>> though.
>>>>>>>>>>> 
>>>>>>>>>>> -Sameer.
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
>>>>>>> matthias@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Did you see Eno's reply?
>>>>>>>>>>>> 
>>>>>>>>>>>> Please try out Streams 0.10.2.1 -- this should be fixed
>>> there.
>>>> If
>>>>>>> not,
>>>>>>>>>>>> please report back.
>>>>>>>>>>>> 
>>>>>>>>>>>> I would also recommend to subscribe to the list. It's
>>>>> self-service
>>>>>>>>>>>> http://kafka.apache.org/contact
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>> 
>>>>>>>>>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
>>>>>>>>>>>>> My brokers are on version 10.1.0 and my clients are on
>>> version
>>>>>>> 10.2.0.
>>>>>>>>>>>>> Also, do a reply to all, I am currently not subscribed to
>>> the
>>>>>> list.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> -Sameer.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
>>>>>>> sam.kum.work@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I ran two nodes in my streams compute cluster, they were
>>>>> running
>>>>>>> fine
>>>>>>>>>>>> for
>>>>>>>>>>>>>> few hours before outputting with failure to rebalance
>>> errors.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I couldnt understand why this happened but I saw one
>>> strange
>>>>>>>>>>>> behaviour...
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> at 16:53 on node1, I saw "Failed to lock the state
>>> directory"
>>>>>>> error,
>>>>>>>>>>>> this
>>>>>>>>>>>>>> might have caused the partitions to relocate and hence the
>>>>> error.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I am attaching detailed logs for both the nodes, please
>>> see
>>>> if
>>>>>> you
>>>>>>>>> can
>>>>>>>>>>>>>> help.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Some of the logs for quick reference are these.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception
>>> caught
>>>> in
>>>>>>> thread
>>>>>>>>>>>>>> StreamThread-2
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.kafka.streams.errors.StreamsException:
>>>> stream-thread
>>>>>>>>>>>>>> [StreamThread-2] Failed to rebalance
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Caused by:
>>> org.apache.kafka.streams.errors.StreamsException:
>>>>>>>>>>>>>> stream-thread [StreamThread-2] failed to suspend stream
>>> tasks
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>>>>>>>>>>> d.java:488)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:
>>>> 69)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
>>>>>>>>>>>> ad.java:259)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>> 
>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
>>>>>>>>>>>>>> dinator.java:396)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>> 
>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
>>>>>>>>>>>>>> Coordinator.java:329)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>> 
>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
>>>>>>>>>>>>>> Coordinator.java:303)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>> 
>>>>>>> 
>>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               ... 1 more
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Caused by: org.apache.kafka.clients.consumer.
>>>>>> CommitFailedException:
>>>>>>>>>>>>>> Commit cannot be completed since the group has already
>>>>> rebalanced
>>>>>>> and
>>>>>>>>>>>>>> assigned the partitions to another member. This means that
>>>> the
>>>>>> time
>>>>>>>>>>>> between
>>>>>>>>>>>>>> subsequent calls to poll() was longer than the configured
>>>>>>>>>>>>>> max.poll.interval.ms, which typically implies that the
>>> poll
>>>>> loop
>>>>>>> is
>>>>>>>>>>>>>> spending too much time message processing. You can address
>>>> this
>>>>>>>>> either
>>>>>>>>>>>> by
>>>>>>>>>>>>>> increasing the session timeout or by reducing the maximum
>>>> size
>>>>> of
>>>>>>>>>>>> batches
>>>>>>>>>>>>>> returned in poll() with max.poll.records.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>> 
>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
>>>>>>>>>>>>>> nsumerCoordinator.java:698)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>> 
>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
>>>>>>>>>>>>>> Coordinator.java:577)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.clients.consu
>>>>>>>>>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>>>>>> 
>>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>>>>>>>>>>> d.java:480)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               ... 10 more
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not
>>>> create
>>>>>> task
>>>>>>>>>>>> 1_38.
>>>>>>>>>>>>>> Will retry.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
>>>>>> Failed
>>>>>>> to
>>>>>>>>>>>> lock
>>>>>>>>>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
>>>>>>>>>>>>>> nager.java:102)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
>>>>>> java:834)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
>>>>>>>>>>>> ead.java:1207)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
>>>>>>>>>>>>>> koff(StreamThread.java:1180)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
>>>>>> java:937)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>               at org.apache.kafka.streams.proce
>>>>>>>>>>>>>> 
>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
>>>>>>>>>>>> ead.java:236)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> -Sameer.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> -- Guozhang
>>>>> 
>>>> 
>>> 
>> 


Re: Kafka Streams Failed to rebalance error

Posted by João Peixoto <jo...@gmail.com>.
To help out I made the project that reproduces this issue publicly
available at https://github.com/Hartimer/kafka-stream-issue

On Thu, Jun 8, 2017 at 11:40 PM João Peixoto <jo...@gmail.com>
wrote:

> I am now able to consistently reproduce this issue with a dummy project.
>
> 1. Set "max.poll.interval.ms" to a low value
> 2. Have the pipeline take longer than the interval above
> 3. Profit
>
> This happens every single time and never recovers.
> I simulated the delay by adding a breakpoint on my IDE on a sink "foreach"
> step and then proceeding after the above interval had elapsed.
>
> Any advice on how to work around this using 0.10.2.1 would be greatly
> appreciated.
> Hope it helps
>
> On Wed, Jun 7, 2017 at 10:19 PM João Peixoto <jo...@gmail.com>
> wrote:
>
>> But my stream definition does not have a state store at all, Rocksdb or
>> in memory... That's the most concerning part...
>> On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal <sj...@gmail.com> wrote:
>>
>>> One instance with 10 threads may cause rocksdb issues.
>>> What is the RAM you have?
>>>
>>> Also check CPU wait time. Many rocks db instances on one machine (depends
>>> upon number of partitions) may cause lot of disk i/o causing wait times
>>> to
>>> increase and hence slowing down the message processing causing frequent
>>> rebalance's.
>>>
>>> Also what is your topic partitions. My experience is having one thread
>>> per
>>> partition is ideal.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <jo...@gmail.com>
>>> wrote:
>>>
>>> > There is one instance with 10 threads.
>>> >
>>> > On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>> >
>>> > > João,
>>> > >
>>> > > Do you also have multiple running instances in parallel, and how many
>>> > > threads are your running within each instance?
>>> > >
>>> > > Guozhang
>>> > >
>>> > >
>>> > > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <
>>> joao.hartimer@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Eno before I do so I just want to be sure this would not be a
>>> > duplicate.
>>> > > I
>>> > > > just found the following issues:
>>> > > >
>>> > > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as
>>> being
>>> > > fixed
>>> > > > on 0.11.0.0/0.10.2.2 (both not released afaik)
>>> > > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
>>> > > progress
>>> > > >
>>> > > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <
>>> eno.thereska@gmail.com>
>>> > > > wrote:
>>> > > >
>>> > > > > Hi there,
>>> > > > >
>>> > > > > This might be a bug, would you mind opening a JIRA (copy-pasting
>>> > below
>>> > > is
>>> > > > > sufficient).
>>> > > > >
>>> > > > > Thanks
>>> > > > > Eno
>>> > > > > > On 7 Jun 2017, at 21:38, João Peixoto <joao.hartimer@gmail.com
>>> >
>>> > > wrote:
>>> > > > > >
>>> > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
>>> > > > > >
>>> > > > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
>>> > > > > > o.a.k.s.p.internals.StreamThread         : Could not create
>>> task
>>> > > 0_31.
>>> > > > > Will
>>> > > > > > retry.
>>> > > > > >
>>> > > > > > org.apache.kafka.streams.errors.LockException: task [0_31]
>>> Failed
>>> > to
>>> > > > lock
>>> > > > > > the state directory for task 0_31
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.
>>> > > > ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
>>> > > > AbstractTask.java:73)
>>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.
>>> > > > StreamTask.<init>(StreamTask.java:108)
>>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.
>>> > > > StreamThread.createStreamTask(StreamThread.java:864)
>>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.
>>> > StreamThread$TaskCreator.
>>> > > > createTask(StreamThread.java:1237)
>>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$
>>> > > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.
>>> > > > StreamThread.addStreamTasks(StreamThread.java:967)
>>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.
>>> > StreamThread.access$600(
>>> > > > StreamThread.java:69)
>>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
>>> > > > onPartitionsAssigned(StreamThread.java:234)
>>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>>> > > > onJoinComplete(ConsumerCoordinator.java:259)
>>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>> > > > joinGroupIfNeeded(AbstractCoordinator.java:352)
>>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>> > > > ensureActiveGroup(AbstractCoordinator.java:303)
>>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.clients.consumer.internals.
>>> > ConsumerCoordinator.poll(
>>> > > > ConsumerCoordinator.java:290)
>>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
>>> > > > pollOnce(KafkaConsumer.java:1029)
>>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>> > > > KafkaConsumer.java:995)
>>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > >
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>> > > > StreamThread.java:592)
>>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > > at
>>> > > > > >
>>> > > > > org.apache.kafka.streams.processor.internals.
>>> > > > StreamThread.run(StreamThread.java:361)
>>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>>> > > > > >
>>> > > > > >
>>> > > > > > It has been printing it for hours now, so it does not recover
>>> at
>>> > all.
>>> > > > > > The most worrying thing is that this stream definition does not
>>> > even
>>> > > > use
>>> > > > > > state stores, it literally looks like this:
>>> > > > > >
>>> > > > > > KStreamBuilder builder = new KStreamBuilder();
>>> > > > > >        KStream<byte[], Message> kStream =
>>> > > > > > builder.stream(appOptions.getInput().getTopic());
>>> > > > > >        kStream.process(() -> processor);
>>> > > > > >        new KafkaStreams(builder, streamsConfiguration);
>>> > > > > >
>>> > > > > > The "processor" does its thing and calls "context().commit()"
>>> when
>>> > > > done.
>>> > > > > > That's it. Looking at the actual machine running the instance,
>>> the
>>> > > > > folders
>>> > > > > > under /tmp/kafka-streams/<stream name>/ only have a .lock file.
>>> > > > > >
>>> > > > > > This seems to have been bootstrapped by the exception:
>>> > > > > >
>>> > > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>> > > > cannot be
>>> > > > > > completed since the group has already rebalanced and assigned
>>> the
>>> > > > > > partitions to another member. This means that the time between
>>> > > > subsequent
>>> > > > > > calls to poll() was longer than the configured
>>> > max.poll.interval.ms,
>>> > > > > which
>>> > > > > > typically implies that the poll loop is spending too much time
>>> > > message
>>> > > > > > processing. You can address this either by increasing the
>>> session
>>> > > > timeout
>>> > > > > > or by reducing the maximum size of batches returned in poll()
>>> with
>>> > > > > > max.poll.records.
>>> > > > > >
>>> > > > > > We are addressing the latter by reducing "max.poll.records" and
>>> > > > > increasing "
>>> > > > > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not
>>> > worry
>>> > > > > about
>>> > > > > > state dirs if there are no state stores? Since it doesn't seem
>>> to
>>> > do
>>> > > so
>>> > > > > > automatically, can I configured it somehow to achieve this end?
>>> > > > > >
>>> > > > > > Additionally, what could lead to it not being able to recover?
>>> > > > > >
>>> > > > > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
>>> > > matthias@confluent.io
>>> > > > >
>>> > > > > > wrote:
>>> > > > > >
>>> > > > > >> Great! :)
>>> > > > > >>
>>> > > > > >> On 5/16/17 2:31 AM, Sameer Kumar wrote:
>>> > > > > >>> I see now that my Kafka cluster is very stable, and these
>>> errors
>>> > > dont
>>> > > > > >> come
>>> > > > > >>> now.
>>> > > > > >>>
>>> > > > > >>> -Sameer.
>>> > > > > >>>
>>> > > > > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
>>> > > sam.kum.work@gmail.com
>>> > > > >
>>> > > > > >> wrote:
>>> > > > > >>>
>>> > > > > >>>> Yes, I have upgraded my cluster and client both to version
>>> > 10.2.1
>>> > > > and
>>> > > > > >>>> currently monitoring the situation.
>>> > > > > >>>> Will report back in case I find any errors. Thanks for the
>>> help
>>> > > > > though.
>>> > > > > >>>>
>>> > > > > >>>> -Sameer.
>>> > > > > >>>>
>>> > > > > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
>>> > > > > matthias@confluent.io>
>>> > > > > >>>> wrote:
>>> > > > > >>>>
>>> > > > > >>>>> Did you see Eno's reply?
>>> > > > > >>>>>
>>> > > > > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed
>>> there.
>>> > If
>>> > > > > not,
>>> > > > > >>>>> please report back.
>>> > > > > >>>>>
>>> > > > > >>>>> I would also recommend to subscribe to the list. It's
>>> > > self-service
>>> > > > > >>>>> http://kafka.apache.org/contact
>>> > > > > >>>>>
>>> > > > > >>>>>
>>> > > > > >>>>> -Matthias
>>> > > > > >>>>>
>>> > > > > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
>>> > > > > >>>>>> My brokers are on version 10.1.0 and my clients are on
>>> version
>>> > > > > 10.2.0.
>>> > > > > >>>>>> Also, do a reply to all, I am currently not subscribed to
>>> the
>>> > > > list.
>>> > > > > >>>>>>
>>> > > > > >>>>>> -Sameer.
>>> > > > > >>>>>>
>>> > > > > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
>>> > > > > sam.kum.work@gmail.com>
>>> > > > > >>>>> wrote:
>>> > > > > >>>>>>
>>> > > > > >>>>>>> Hi,
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> I ran two nodes in my streams compute cluster, they were
>>> > > running
>>> > > > > fine
>>> > > > > >>>>> for
>>> > > > > >>>>>>> few hours before outputting with failure to rebalance
>>> errors.
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> I couldnt understand why this happened but I saw one
>>> strange
>>> > > > > >>>>> behaviour...
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state
>>> directory"
>>> > > > > error,
>>> > > > > >>>>> this
>>> > > > > >>>>>>> might have caused the partitions to relocate and hence
>>> the
>>> > > error.
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> I am attaching detailed logs for both the nodes, please
>>> see
>>> > if
>>> > > > you
>>> > > > > >> can
>>> > > > > >>>>>>> help.
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> Some of the logs for quick reference are these.
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception
>>> caught
>>> > in
>>> > > > > thread
>>> > > > > >>>>>>> StreamThread-2
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> org.apache.kafka.streams.errors.StreamsException:
>>> > stream-thread
>>> > > > > >>>>>>> [StreamThread-2] Failed to rebalance
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> Caused by:
>>> org.apache.kafka.streams.errors.StreamsException:
>>> > > > > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream
>>> tasks
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>> > > > > >>>>> d.java:488)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread.access$1200(StreamThread.java:
>>> > 69)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
>>> > > > > >>>>> ad.java:259)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>>> > > > > >>>>>>>
>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
>>> > > > > >>>>>>> dinator.java:396)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>>> > > > > >>>>>>>
>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
>>> > > > > >>>>>>> Coordinator.java:329)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>>> > > > > >>>>>>>
>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
>>> > > > > >>>>>>> Coordinator.java:303)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>>> > > > > >>>>>>>
>>> > > > >
>>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>>> > > > > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>>> > > > > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                ... 1 more
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> Caused by: org.apache.kafka.clients.consumer.
>>> > > > CommitFailedException:
>>> > > > > >>>>>>> Commit cannot be completed since the group has already
>>> > > rebalanced
>>> > > > > and
>>> > > > > >>>>>>> assigned the partitions to another member. This means
>>> that
>>> > the
>>> > > > time
>>> > > > > >>>>> between
>>> > > > > >>>>>>> subsequent calls to poll() was longer than the configured
>>> > > > > >>>>>>> max.poll.interval.ms, which typically implies that the
>>> poll
>>> > > loop
>>> > > > > is
>>> > > > > >>>>>>> spending too much time message processing. You can
>>> address
>>> > this
>>> > > > > >> either
>>> > > > > >>>>> by
>>> > > > > >>>>>>> increasing the session timeout or by reducing the maximum
>>> > size
>>> > > of
>>> > > > > >>>>> batches
>>> > > > > >>>>>>> returned in poll() with max.poll.records.
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>>> > > > > >>>>>>>
>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
>>> > > > > >>>>>>> nsumerCoordinator.java:698)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>>> > > > > >>>>>>>
>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
>>> > > > > >>>>>>> Coordinator.java:577)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>>> > > > > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> > > > >
>>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> > > ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>> > > > > >>>>> d.java:480)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                ... 10 more
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not
>>> > create
>>> > > > task
>>> > > > > >>>>> 1_38.
>>> > > > > >>>>>>> Will retry.
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> org.apache.kafka.streams.errors.LockException: task
>>> [1_38]
>>> > > > Failed
>>> > > > > to
>>> > > > > >>>>> lock
>>> > > > > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
>>> > > > > >>>>>>> nager.java:102)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
>>> > > > java:834)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
>>> > > > > >>>>> ead.java:1207)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
>>> > > > > >>>>>>> koff(StreamThread.java:1180)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
>>> > > > java:937)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>>> > > > > >>>>>>>
>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
>>> > > > > >>>>> ead.java:236)
>>> > > > > >>>>>>>
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> Regards,
>>> > > > > >>>>>>>
>>> > > > > >>>>>>> -Sameer.
>>> > > > > >>>>>>>
>>> > > > > >>>>>>
>>> > > > > >>>>>
>>> > > > > >>>>>
>>> > > > > >>>>
>>> > > > > >>>
>>> > > > > >>
>>> > > > > >>
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> > >
>>> > >
>>> > > --
>>> > > -- Guozhang
>>> > >
>>> >
>>>
>>

Re: Kafka Streams Failed to rebalance error

Posted by João Peixoto <jo...@gmail.com>.
I am now able to consistently reproduce this issue with a dummy project.

1. Set "max.poll.interval.ms" to a low value
2. Have the pipeline take longer than the interval above
3. Profit

This happens every single time and never recovers.
I simulated the delay by adding a breakpoint on my IDE on a sink "foreach"
step and then proceeding after the above interval had elapsed.

Any advice on how to work around this using 0.10.2.1 would be greatly
appreciated.
Hope it helps

On Wed, Jun 7, 2017 at 10:19 PM João Peixoto <jo...@gmail.com>
wrote:

> But my stream definition does not have a state store at all, Rocksdb or in
> memory... That's the most concerning part...
> On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal <sj...@gmail.com> wrote:
>
>> One instance with 10 threads may cause rocksdb issues.
>> What is the RAM you have?
>>
>> Also check CPU wait time. Many rocks db instances on one machine (depends
>> upon number of partitions) may cause lot of disk i/o causing wait times to
>> increase and hence slowing down the message processing causing frequent
>> rebalance's.
>>
>> Also what is your topic partitions. My experience is having one thread per
>> partition is ideal.
>>
>> Thanks
>> Sachin
>>
>>
>> On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <jo...@gmail.com>
>> wrote:
>>
>> > There is one instance with 10 threads.
>> >
>> > On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > João,
>> > >
>> > > Do you also have multiple running instances in parallel, and how many
>> > > threads are your running within each instance?
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <joao.hartimer@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > Eno before I do so I just want to be sure this would not be a
>> > duplicate.
>> > > I
>> > > > just found the following issues:
>> > > >
>> > > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
>> > > fixed
>> > > > on 0.11.0.0/0.10.2.2 (both not released afaik)
>> > > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
>> > > progress
>> > > >
>> > > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <eno.thereska@gmail.com
>> >
>> > > > wrote:
>> > > >
>> > > > > Hi there,
>> > > > >
>> > > > > This might be a bug, would you mind opening a JIRA (copy-pasting
>> > below
>> > > is
>> > > > > sufficient).
>> > > > >
>> > > > > Thanks
>> > > > > Eno
>> > > > > > On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com>
>> > > wrote:
>> > > > > >
>> > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
>> > > > > >
>> > > > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
>> > > > > > o.a.k.s.p.internals.StreamThread         : Could not create task
>> > > 0_31.
>> > > > > Will
>> > > > > > retry.
>> > > > > >
>> > > > > > org.apache.kafka.streams.errors.LockException: task [0_31]
>> Failed
>> > to
>> > > > lock
>> > > > > > the state directory for task 0_31
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
>> > > > AbstractTask.java:73)
>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamTask.<init>(StreamTask.java:108)
>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamThread.createStreamTask(StreamThread.java:864)
>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.
>> > StreamThread$TaskCreator.
>> > > > createTask(StreamThread.java:1237)
>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$
>> > > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamThread.addStreamTasks(StreamThread.java:967)
>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.
>> > StreamThread.access$600(
>> > > > StreamThread.java:69)
>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
>> > > > onPartitionsAssigned(StreamThread.java:234)
>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>> > > > onJoinComplete(ConsumerCoordinator.java:259)
>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> > > > joinGroupIfNeeded(AbstractCoordinator.java:352)
>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> > > > ensureActiveGroup(AbstractCoordinator.java:303)
>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.clients.consumer.internals.
>> > ConsumerCoordinator.poll(
>> > > > ConsumerCoordinator.java:290)
>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
>> > > > pollOnce(KafkaConsumer.java:1029)
>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> > > > KafkaConsumer.java:995)
>> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > > > StreamThread.java:592)
>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>> > > > > > at
>> > > > > >
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamThread.run(StreamThread.java:361)
>> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
>> > > > > >
>> > > > > >
>> > > > > > It has been printing it for hours now, so it does not recover at
>> > all.
>> > > > > > The most worrying thing is that this stream definition does not
>> > even
>> > > > use
>> > > > > > state stores, it literally looks like this:
>> > > > > >
>> > > > > > KStreamBuilder builder = new KStreamBuilder();
>> > > > > >        KStream<byte[], Message> kStream =
>> > > > > > builder.stream(appOptions.getInput().getTopic());
>> > > > > >        kStream.process(() -> processor);
>> > > > > >        new KafkaStreams(builder, streamsConfiguration);
>> > > > > >
>> > > > > > The "processor" does its thing and calls "context().commit()"
>> when
>> > > > done.
>> > > > > > That's it. Looking at the actual machine running the instance,
>> the
>> > > > > folders
>> > > > > > under /tmp/kafka-streams/<stream name>/ only have a .lock file.
>> > > > > >
>> > > > > > This seems to have been bootstrapped by the exception:
>> > > > > >
>> > > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
>> > > > cannot be
>> > > > > > completed since the group has already rebalanced and assigned
>> the
>> > > > > > partitions to another member. This means that the time between
>> > > > subsequent
>> > > > > > calls to poll() was longer than the configured
>> > max.poll.interval.ms,
>> > > > > which
>> > > > > > typically implies that the poll loop is spending too much time
>> > > message
>> > > > > > processing. You can address this either by increasing the
>> session
>> > > > timeout
>> > > > > > or by reducing the maximum size of batches returned in poll()
>> with
>> > > > > > max.poll.records.
>> > > > > >
>> > > > > > We are addressing the latter by reducing "max.poll.records" and
>> > > > > increasing "
>> > > > > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not
>> > worry
>> > > > > about
>> > > > > > state dirs if there are no state stores? Since it doesn't seem
>> to
>> > do
>> > > so
>> > > > > > automatically, can I configured it somehow to achieve this end?
>> > > > > >
>> > > > > > Additionally, what could lead to it not being able to recover?
>> > > > > >
>> > > > > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
>> > > matthias@confluent.io
>> > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > >> Great! :)
>> > > > > >>
>> > > > > >> On 5/16/17 2:31 AM, Sameer Kumar wrote:
>> > > > > >>> I see now that my Kafka cluster is very stable, and these
>> errors
>> > > dont
>> > > > > >> come
>> > > > > >>> now.
>> > > > > >>>
>> > > > > >>> -Sameer.
>> > > > > >>>
>> > > > > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
>> > > sam.kum.work@gmail.com
>> > > > >
>> > > > > >> wrote:
>> > > > > >>>
>> > > > > >>>> Yes, I have upgraded my cluster and client both to version
>> > 10.2.1
>> > > > and
>> > > > > >>>> currently monitoring the situation.
>> > > > > >>>> Will report back in case I find any errors. Thanks for the
>> help
>> > > > > though.
>> > > > > >>>>
>> > > > > >>>> -Sameer.
>> > > > > >>>>
>> > > > > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
>> > > > > matthias@confluent.io>
>> > > > > >>>> wrote:
>> > > > > >>>>
>> > > > > >>>>> Did you see Eno's reply?
>> > > > > >>>>>
>> > > > > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed
>> there.
>> > If
>> > > > > not,
>> > > > > >>>>> please report back.
>> > > > > >>>>>
>> > > > > >>>>> I would also recommend to subscribe to the list. It's
>> > > self-service
>> > > > > >>>>> http://kafka.apache.org/contact
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>> -Matthias
>> > > > > >>>>>
>> > > > > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
>> > > > > >>>>>> My brokers are on version 10.1.0 and my clients are on
>> version
>> > > > > 10.2.0.
>> > > > > >>>>>> Also, do a reply to all, I am currently not subscribed to
>> the
>> > > > list.
>> > > > > >>>>>>
>> > > > > >>>>>> -Sameer.
>> > > > > >>>>>>
>> > > > > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
>> > > > > sam.kum.work@gmail.com>
>> > > > > >>>>> wrote:
>> > > > > >>>>>>
>> > > > > >>>>>>> Hi,
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> I ran two nodes in my streams compute cluster, they were
>> > > running
>> > > > > fine
>> > > > > >>>>> for
>> > > > > >>>>>>> few hours before outputting with failure to rebalance
>> errors.
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> I couldnt understand why this happened but I saw one
>> strange
>> > > > > >>>>> behaviour...
>> > > > > >>>>>>>
>> > > > > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state
>> directory"
>> > > > > error,
>> > > > > >>>>> this
>> > > > > >>>>>>> might have caused the partitions to relocate and hence the
>> > > error.
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> I am attaching detailed logs for both the nodes, please
>> see
>> > if
>> > > > you
>> > > > > >> can
>> > > > > >>>>>>> help.
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> Some of the logs for quick reference are these.
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception
>> caught
>> > in
>> > > > > thread
>> > > > > >>>>>>> StreamThread-2
>> > > > > >>>>>>>
>> > > > > >>>>>>> org.apache.kafka.streams.errors.StreamsException:
>> > stream-thread
>> > > > > >>>>>>> [StreamThread-2] Failed to rebalance
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
>> > > > > >>>>>>>
>> > > > > >>>>>>> Caused by:
>> org.apache.kafka.streams.errors.StreamsException:
>> > > > > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream
>> tasks
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>> > > > > >>>>> d.java:488)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:
>> > 69)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
>> > > > > >>>>> ad.java:259)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>> > > > > >>>>>>>
>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
>> > > > > >>>>>>> dinator.java:396)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>> > > > > >>>>>>>
>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
>> > > > > >>>>>>> Coordinator.java:329)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>> > > > > >>>>>>>
>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
>> > > > > >>>>>>> Coordinator.java:303)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>> > > > > >>>>>>>
>> > > > >
>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>> > > > > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>> > > > > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                ... 1 more
>> > > > > >>>>>>>
>> > > > > >>>>>>> Caused by: org.apache.kafka.clients.consumer.
>> > > > CommitFailedException:
>> > > > > >>>>>>> Commit cannot be completed since the group has already
>> > > rebalanced
>> > > > > and
>> > > > > >>>>>>> assigned the partitions to another member. This means that
>> > the
>> > > > time
>> > > > > >>>>> between
>> > > > > >>>>>>> subsequent calls to poll() was longer than the configured
>> > > > > >>>>>>> max.poll.interval.ms, which typically implies that the
>> poll
>> > > loop
>> > > > > is
>> > > > > >>>>>>> spending too much time message processing. You can address
>> > this
>> > > > > >> either
>> > > > > >>>>> by
>> > > > > >>>>>>> increasing the session timeout or by reducing the maximum
>> > size
>> > > of
>> > > > > >>>>> batches
>> > > > > >>>>>>> returned in poll() with max.poll.records.
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>> > > > > >>>>>>>
>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
>> > > > > >>>>>>> nsumerCoordinator.java:698)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>> > > > > >>>>>>>
>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
>> > > > > >>>>>>> Coordinator.java:577)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.clients.consu
>> > > > > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> > > > >
>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> > > ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>> > > > > >>>>> d.java:480)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                ... 10 more
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not
>> > create
>> > > > task
>> > > > > >>>>> 1_38.
>> > > > > >>>>>>> Will retry.
>> > > > > >>>>>>>
>> > > > > >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
>> > > > Failed
>> > > > > to
>> > > > > >>>>> lock
>> > > > > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
>> > > > > >>>>>>> nager.java:102)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
>> > > > java:834)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
>> > > > > >>>>> ead.java:1207)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
>> > > > > >>>>>>> koff(StreamThread.java:1180)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
>> > > > java:937)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>> > > > > >>>>>>>
>> > > > > >>>>>>>                at org.apache.kafka.streams.proce
>> > > > > >>>>>>>
>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
>> > > > > >>>>> ead.java:236)
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> Regards,
>> > > > > >>>>>>>
>> > > > > >>>>>>> -Sameer.
>> > > > > >>>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>
>> > > > > >>>
>> > > > > >>
>> > > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>

Re: Kafka Streams Failed to rebalance error

Posted by João Peixoto <jo...@gmail.com>.
But my stream definition does not have a state store at all, Rocksdb or in
memory... That's the most concerning part...
On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal <sj...@gmail.com> wrote:

> One instance with 10 threads may cause rocksdb issues.
> What is the RAM you have?
>
> Also check CPU wait time. Many rocks db instances on one machine (depends
> upon number of partitions) may cause lot of disk i/o causing wait times to
> increase and hence slowing down the message processing causing frequent
> rebalance's.
>
> Also what is your topic partitions. My experience is having one thread per
> partition is ideal.
>
> Thanks
> Sachin
>
>
> On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <jo...@gmail.com>
> wrote:
>
> > There is one instance with 10 threads.
> >
> > On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > João,
> > >
> > > Do you also have multiple running instances in parallel, and how many
> > > threads are your running within each instance?
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <jo...@gmail.com>
> > > wrote:
> > >
> > > > Eno before I do so I just want to be sure this would not be a
> > duplicate.
> > > I
> > > > just found the following issues:
> > > >
> > > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
> > > fixed
> > > > on 0.11.0.0/0.10.2.2 (both not released afaik)
> > > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
> > > progress
> > > >
> > > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <en...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi there,
> > > > >
> > > > > This might be a bug, would you mind opening a JIRA (copy-pasting
> > below
> > > is
> > > > > sufficient).
> > > > >
> > > > > Thanks
> > > > > Eno
> > > > > > On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > > > > >
> > > > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > > > > o.a.k.s.p.internals.StreamThread         : Could not create task
> > > 0_31.
> > > > > Will
> > > > > > retry.
> > > > > >
> > > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed
> > to
> > > > lock
> > > > > > the state directory for task 0_31
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> > > > AbstractTask.java:73)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamTask.<init>(StreamTask.java:108)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.createStreamTask(StreamThread.java:864)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > StreamThread$TaskCreator.
> > > > createTask(StreamThread.java:1237)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$
> > > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.addStreamTasks(StreamThread.java:967)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > StreamThread.access$600(
> > > > StreamThread.java:69)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > > onPartitionsAssigned(StreamThread.java:234)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > > onJoinComplete(ConsumerCoordinator.java:259)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > > joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > > ensureActiveGroup(AbstractCoordinator.java:303)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.poll(
> > > > ConsumerCoordinator.java:290)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > pollOnce(KafkaConsumer.java:1029)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > KafkaConsumer.java:995)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > > StreamThread.java:592)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.run(StreamThread.java:361)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > >
> > > > > >
> > > > > > It has been printing it for hours now, so it does not recover at
> > all.
> > > > > > The most worrying thing is that this stream definition does not
> > even
> > > > use
> > > > > > state stores, it literally looks like this:
> > > > > >
> > > > > > KStreamBuilder builder = new KStreamBuilder();
> > > > > >        KStream<byte[], Message> kStream =
> > > > > > builder.stream(appOptions.getInput().getTopic());
> > > > > >        kStream.process(() -> processor);
> > > > > >        new KafkaStreams(builder, streamsConfiguration);
> > > > > >
> > > > > > The "processor" does its thing and calls "context().commit()"
> when
> > > > done.
> > > > > > That's it. Looking at the actual machine running the instance,
> the
> > > > > folders
> > > > > > under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> > > > > >
> > > > > > This seems to have been bootstrapped by the exception:
> > > > > >
> > > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > > > cannot be
> > > > > > completed since the group has already rebalanced and assigned the
> > > > > > partitions to another member. This means that the time between
> > > > subsequent
> > > > > > calls to poll() was longer than the configured
> > max.poll.interval.ms,
> > > > > which
> > > > > > typically implies that the poll loop is spending too much time
> > > message
> > > > > > processing. You can address this either by increasing the session
> > > > timeout
> > > > > > or by reducing the maximum size of batches returned in poll()
> with
> > > > > > max.poll.records.
> > > > > >
> > > > > > We are addressing the latter by reducing "max.poll.records" and
> > > > > increasing "
> > > > > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not
> > worry
> > > > > about
> > > > > > state dirs if there are no state stores? Since it doesn't seem to
> > do
> > > so
> > > > > > automatically, can I configured it somehow to achieve this end?
> > > > > >
> > > > > > Additionally, what could lead to it not being able to recover?
> > > > > >
> > > > > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
> > > matthias@confluent.io
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Great! :)
> > > > > >>
> > > > > >> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> > > > > >>> I see now that my Kafka cluster is very stable, and these
> errors
> > > dont
> > > > > >> come
> > > > > >>> now.
> > > > > >>>
> > > > > >>> -Sameer.
> > > > > >>>
> > > > > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
> > > sam.kum.work@gmail.com
> > > > >
> > > > > >> wrote:
> > > > > >>>
> > > > > >>>> Yes, I have upgraded my cluster and client both to version
> > 10.2.1
> > > > and
> > > > > >>>> currently monitoring the situation.
> > > > > >>>> Will report back in case I find any errors. Thanks for the
> help
> > > > > though.
> > > > > >>>>
> > > > > >>>> -Sameer.
> > > > > >>>>
> > > > > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
> > > > > matthias@confluent.io>
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>>> Did you see Eno's reply?
> > > > > >>>>>
> > > > > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed
> there.
> > If
> > > > > not,
> > > > > >>>>> please report back.
> > > > > >>>>>
> > > > > >>>>> I would also recommend to subscribe to the list. It's
> > > self-service
> > > > > >>>>> http://kafka.apache.org/contact
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> -Matthias
> > > > > >>>>>
> > > > > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> > > > > >>>>>> My brokers are on version 10.1.0 and my clients are on
> version
> > > > > 10.2.0.
> > > > > >>>>>> Also, do a reply to all, I am currently not subscribed to
> the
> > > > list.
> > > > > >>>>>>
> > > > > >>>>>> -Sameer.
> > > > > >>>>>>
> > > > > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
> > > > > sam.kum.work@gmail.com>
> > > > > >>>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>>> Hi,
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> I ran two nodes in my streams compute cluster, they were
> > > running
> > > > > fine
> > > > > >>>>> for
> > > > > >>>>>>> few hours before outputting with failure to rebalance
> errors.
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> I couldnt understand why this happened but I saw one
> strange
> > > > > >>>>> behaviour...
> > > > > >>>>>>>
> > > > > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state
> directory"
> > > > > error,
> > > > > >>>>> this
> > > > > >>>>>>> might have caused the partitions to relocate and hence the
> > > error.
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> I am attaching detailed logs for both the nodes, please see
> > if
> > > > you
> > > > > >> can
> > > > > >>>>>>> help.
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Some of the logs for quick reference are these.
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught
> > in
> > > > > thread
> > > > > >>>>>>> StreamThread-2
> > > > > >>>>>>>
> > > > > >>>>>>> org.apache.kafka.streams.errors.StreamsException:
> > stream-thread
> > > > > >>>>>>> [StreamThread-2] Failed to rebalance
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> > > > > >>>>>>>
> > > > > >>>>>>> Caused by:
> org.apache.kafka.streams.errors.StreamsException:
> > > > > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream
> tasks
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > > > >>>>> d.java:488)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:
> > 69)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> > > > > >>>>> ad.java:259)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > > >>>>>>>
> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> > > > > >>>>>>> dinator.java:396)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > > >>>>>>>
> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> > > > > >>>>>>> Coordinator.java:329)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > > >>>>>>>
> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> > > > > >>>>>>> Coordinator.java:303)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > > >>>>>>>
> > > > >
> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> > > > > >>>>>>>
> > > > > >>>>>>>                ... 1 more
> > > > > >>>>>>>
> > > > > >>>>>>> Caused by: org.apache.kafka.clients.consumer.
> > > > CommitFailedException:
> > > > > >>>>>>> Commit cannot be completed since the group has already
> > > rebalanced
> > > > > and
> > > > > >>>>>>> assigned the partitions to another member. This means that
> > the
> > > > time
> > > > > >>>>> between
> > > > > >>>>>>> subsequent calls to poll() was longer than the configured
> > > > > >>>>>>> max.poll.interval.ms, which typically implies that the
> poll
> > > loop
> > > > > is
> > > > > >>>>>>> spending too much time message processing. You can address
> > this
> > > > > >> either
> > > > > >>>>> by
> > > > > >>>>>>> increasing the session timeout or by reducing the maximum
> > size
> > > of
> > > > > >>>>> batches
> > > > > >>>>>>> returned in poll() with max.poll.records.
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > > >>>>>>>
> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> > > > > >>>>>>> nsumerCoordinator.java:698)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > > >>>>>>>
> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> > > > > >>>>>>> Coordinator.java:577)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> > > > >
> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> > > ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > > > >>>>> d.java:480)
> > > > > >>>>>>>
> > > > > >>>>>>>                ... 10 more
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not
> > create
> > > > task
> > > > > >>>>> 1_38.
> > > > > >>>>>>> Will retry.
> > > > > >>>>>>>
> > > > > >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
> > > > Failed
> > > > > to
> > > > > >>>>> lock
> > > > > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> > > > > >>>>>>> nager.java:102)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
> > > > java:834)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> > > > > >>>>> ead.java:1207)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> > > > > >>>>>>> koff(StreamThread.java:1180)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
> > > > java:937)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> > > > > >>>>>>>
> > > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > > >>>>>>>
> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> > > > > >>>>> ead.java:236)
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Regards,
> > > > > >>>>>>>
> > > > > >>>>>>> -Sameer.
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: Kafka Streams Failed to rebalance error

Posted by Sachin Mittal <sj...@gmail.com>.
One instance with 10 threads may cause rocksdb issues.
What is the RAM you have?

Also check CPU wait time. Many rocks db instances on one machine (depends
upon number of partitions) may cause lot of disk i/o causing wait times to
increase and hence slowing down the message processing causing frequent
rebalance's.

Also what is your topic partitions. My experience is having one thread per
partition is ideal.

Thanks
Sachin


On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <jo...@gmail.com>
wrote:

> There is one instance with 10 threads.
>
> On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > João,
> >
> > Do you also have multiple running instances in parallel, and how many
> > threads are your running within each instance?
> >
> > Guozhang
> >
> >
> > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <jo...@gmail.com>
> > wrote:
> >
> > > Eno before I do so I just want to be sure this would not be a
> duplicate.
> > I
> > > just found the following issues:
> > >
> > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
> > fixed
> > > on 0.11.0.0/0.10.2.2 (both not released afaik)
> > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
> > progress
> > >
> > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <en...@gmail.com>
> > > wrote:
> > >
> > > > Hi there,
> > > >
> > > > This might be a bug, would you mind opening a JIRA (copy-pasting
> below
> > is
> > > > sufficient).
> > > >
> > > > Thanks
> > > > Eno
> > > > > On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com>
> > wrote:
> > > > >
> > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > > > >
> > > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > > > o.a.k.s.p.internals.StreamThread         : Could not create task
> > 0_31.
> > > > Will
> > > > > retry.
> > > > >
> > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed
> to
> > > lock
> > > > > the state directory for task 0_31
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> > > AbstractTask.java:73)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.<init>(StreamTask.java:108)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.createStreamTask(StreamThread.java:864)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread$TaskCreator.
> > > createTask(StreamThread.java:1237)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread$
> > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.addStreamTasks(StreamThread.java:967)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.access$600(
> > > StreamThread.java:69)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > onPartitionsAssigned(StreamThread.java:234)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > onJoinComplete(ConsumerCoordinator.java:259)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > ensureActiveGroup(AbstractCoordinator.java:303)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:290)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1029)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:995)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:592)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:361)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > >
> > > > >
> > > > > It has been printing it for hours now, so it does not recover at
> all.
> > > > > The most worrying thing is that this stream definition does not
> even
> > > use
> > > > > state stores, it literally looks like this:
> > > > >
> > > > > KStreamBuilder builder = new KStreamBuilder();
> > > > >        KStream<byte[], Message> kStream =
> > > > > builder.stream(appOptions.getInput().getTopic());
> > > > >        kStream.process(() -> processor);
> > > > >        new KafkaStreams(builder, streamsConfiguration);
> > > > >
> > > > > The "processor" does its thing and calls "context().commit()" when
> > > done.
> > > > > That's it. Looking at the actual machine running the instance, the
> > > > folders
> > > > > under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> > > > >
> > > > > This seems to have been bootstrapped by the exception:
> > > > >
> > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > > cannot be
> > > > > completed since the group has already rebalanced and assigned the
> > > > > partitions to another member. This means that the time between
> > > subsequent
> > > > > calls to poll() was longer than the configured
> max.poll.interval.ms,
> > > > which
> > > > > typically implies that the poll loop is spending too much time
> > message
> > > > > processing. You can address this either by increasing the session
> > > timeout
> > > > > or by reducing the maximum size of batches returned in poll() with
> > > > > max.poll.records.
> > > > >
> > > > > We are addressing the latter by reducing "max.poll.records" and
> > > > increasing "
> > > > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not
> worry
> > > > about
> > > > > state dirs if there are no state stores? Since it doesn't seem to
> do
> > so
> > > > > automatically, can I configured it somehow to achieve this end?
> > > > >
> > > > > Additionally, what could lead to it not being able to recover?
> > > > >
> > > > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > > > wrote:
> > > > >
> > > > >> Great! :)
> > > > >>
> > > > >> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> > > > >>> I see now that my Kafka cluster is very stable, and these errors
> > dont
> > > > >> come
> > > > >>> now.
> > > > >>>
> > > > >>> -Sameer.
> > > > >>>
> > > > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
> > sam.kum.work@gmail.com
> > > >
> > > > >> wrote:
> > > > >>>
> > > > >>>> Yes, I have upgraded my cluster and client both to version
> 10.2.1
> > > and
> > > > >>>> currently monitoring the situation.
> > > > >>>> Will report back in case I find any errors. Thanks for the help
> > > > though.
> > > > >>>>
> > > > >>>> -Sameer.
> > > > >>>>
> > > > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
> > > > matthias@confluent.io>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Did you see Eno's reply?
> > > > >>>>>
> > > > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed there.
> If
> > > > not,
> > > > >>>>> please report back.
> > > > >>>>>
> > > > >>>>> I would also recommend to subscribe to the list. It's
> > self-service
> > > > >>>>> http://kafka.apache.org/contact
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> -Matthias
> > > > >>>>>
> > > > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> > > > >>>>>> My brokers are on version 10.1.0 and my clients are on version
> > > > 10.2.0.
> > > > >>>>>> Also, do a reply to all, I am currently not subscribed to the
> > > list.
> > > > >>>>>>
> > > > >>>>>> -Sameer.
> > > > >>>>>>
> > > > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
> > > > sam.kum.work@gmail.com>
> > > > >>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi,
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I ran two nodes in my streams compute cluster, they were
> > running
> > > > fine
> > > > >>>>> for
> > > > >>>>>>> few hours before outputting with failure to rebalance errors.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I couldnt understand why this happened but I saw one strange
> > > > >>>>> behaviour...
> > > > >>>>>>>
> > > > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory"
> > > > error,
> > > > >>>>> this
> > > > >>>>>>> might have caused the partitions to relocate and hence the
> > error.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I am attaching detailed logs for both the nodes, please see
> if
> > > you
> > > > >> can
> > > > >>>>>>> help.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Some of the logs for quick reference are these.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught
> in
> > > > thread
> > > > >>>>>>> StreamThread-2
> > > > >>>>>>>
> > > > >>>>>>> org.apache.kafka.streams.errors.StreamsException:
> stream-thread
> > > > >>>>>>> [StreamThread-2] Failed to rebalance
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> > > > >>>>>>>
> > > > >>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
> > > > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > > >>>>> d.java:488)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:
> 69)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> > > > >>>>> ad.java:259)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> > > > >>>>>>> dinator.java:396)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> > > > >>>>>>> Coordinator.java:329)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> > > > >>>>>>> Coordinator.java:303)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>>
> > > > mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> > > > >>>>>>>
> > > > >>>>>>>                ... 1 more
> > > > >>>>>>>
> > > > >>>>>>> Caused by: org.apache.kafka.clients.consumer.
> > > CommitFailedException:
> > > > >>>>>>> Commit cannot be completed since the group has already
> > rebalanced
> > > > and
> > > > >>>>>>> assigned the partitions to another member. This means that
> the
> > > time
> > > > >>>>> between
> > > > >>>>>>> subsequent calls to poll() was longer than the configured
> > > > >>>>>>> max.poll.interval.ms, which typically implies that the poll
> > loop
> > > > is
> > > > >>>>>>> spending too much time message processing. You can address
> this
> > > > >> either
> > > > >>>>> by
> > > > >>>>>>> increasing the session timeout or by reducing the maximum
> size
> > of
> > > > >>>>> batches
> > > > >>>>>>> returned in poll() with max.poll.records.
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> > > > >>>>>>> nsumerCoordinator.java:698)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> > > > >>>>>>> Coordinator.java:577)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>>
> > > > ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>>
> > ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > > >>>>> d.java:480)
> > > > >>>>>>>
> > > > >>>>>>>                ... 10 more
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not
> create
> > > task
> > > > >>>>> 1_38.
> > > > >>>>>>> Will retry.
> > > > >>>>>>>
> > > > >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
> > > Failed
> > > > to
> > > > >>>>> lock
> > > > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> > > > >>>>>>> nager.java:102)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
> > > java:834)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> > > > >>>>> ead.java:1207)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> > > > >>>>>>> koff(StreamThread.java:1180)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
> > > java:937)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> > > > >>>>> ead.java:236)
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Regards,
> > > > >>>>>>>
> > > > >>>>>>> -Sameer.
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Kafka Streams Failed to rebalance error

Posted by João Peixoto <jo...@gmail.com>.
There is one instance with 10 threads.

On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wa...@gmail.com> wrote:

> João,
>
> Do you also have multiple running instances in parallel, and how many
> threads are your running within each instance?
>
> Guozhang
>
>
> On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <jo...@gmail.com>
> wrote:
>
> > Eno before I do so I just want to be sure this would not be a duplicate.
> I
> > just found the following issues:
> >
> > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
> fixed
> > on 0.11.0.0/0.10.2.2 (both not released afaik)
> > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
> progress
> >
> > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <en...@gmail.com>
> > wrote:
> >
> > > Hi there,
> > >
> > > This might be a bug, would you mind opening a JIRA (copy-pasting below
> is
> > > sufficient).
> > >
> > > Thanks
> > > Eno
> > > > On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com>
> wrote:
> > > >
> > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > > >
> > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > > o.a.k.s.p.internals.StreamThread         : Could not create task
> 0_31.
> > > Will
> > > > retry.
> > > >
> > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to
> > lock
> > > > the state directory for task 0_31
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> > AbstractTask.java:73)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.<init>(StreamTask.java:108)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.createStreamTask(StreamThread.java:864)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> > createTask(StreamThread.java:1237)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread$
> > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.addStreamTasks(StreamThread.java:967)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.access$600(
> > StreamThread.java:69)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > onPartitionsAssigned(StreamThread.java:234)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > onJoinComplete(ConsumerCoordinator.java:259)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > ensureActiveGroup(AbstractCoordinator.java:303)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > ConsumerCoordinator.java:290)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1029)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:995)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:592)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:361)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > >
> > > >
> > > > It has been printing it for hours now, so it does not recover at all.
> > > > The most worrying thing is that this stream definition does not even
> > use
> > > > state stores, it literally looks like this:
> > > >
> > > > KStreamBuilder builder = new KStreamBuilder();
> > > >        KStream<byte[], Message> kStream =
> > > > builder.stream(appOptions.getInput().getTopic());
> > > >        kStream.process(() -> processor);
> > > >        new KafkaStreams(builder, streamsConfiguration);
> > > >
> > > > The "processor" does its thing and calls "context().commit()" when
> > done.
> > > > That's it. Looking at the actual machine running the instance, the
> > > folders
> > > > under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> > > >
> > > > This seems to have been bootstrapped by the exception:
> > > >
> > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > cannot be
> > > > completed since the group has already rebalanced and assigned the
> > > > partitions to another member. This means that the time between
> > subsequent
> > > > calls to poll() was longer than the configured max.poll.interval.ms,
> > > which
> > > > typically implies that the poll loop is spending too much time
> message
> > > > processing. You can address this either by increasing the session
> > timeout
> > > > or by reducing the maximum size of batches returned in poll() with
> > > > max.poll.records.
> > > >
> > > > We are addressing the latter by reducing "max.poll.records" and
> > > increasing "
> > > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry
> > > about
> > > > state dirs if there are no state stores? Since it doesn't seem to do
> so
> > > > automatically, can I configured it somehow to achieve this end?
> > > >
> > > > Additionally, what could lead to it not being able to recover?
> > > >
> > > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> Great! :)
> > > >>
> > > >> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> > > >>> I see now that my Kafka cluster is very stable, and these errors
> dont
> > > >> come
> > > >>> now.
> > > >>>
> > > >>> -Sameer.
> > > >>>
> > > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
> sam.kum.work@gmail.com
> > >
> > > >> wrote:
> > > >>>
> > > >>>> Yes, I have upgraded my cluster and client both to version 10.2.1
> > and
> > > >>>> currently monitoring the situation.
> > > >>>> Will report back in case I find any errors. Thanks for the help
> > > though.
> > > >>>>
> > > >>>> -Sameer.
> > > >>>>
> > > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
> > > matthias@confluent.io>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Did you see Eno's reply?
> > > >>>>>
> > > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed there. If
> > > not,
> > > >>>>> please report back.
> > > >>>>>
> > > >>>>> I would also recommend to subscribe to the list. It's
> self-service
> > > >>>>> http://kafka.apache.org/contact
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> > > >>>>>> My brokers are on version 10.1.0 and my clients are on version
> > > 10.2.0.
> > > >>>>>> Also, do a reply to all, I am currently not subscribed to the
> > list.
> > > >>>>>>
> > > >>>>>> -Sameer.
> > > >>>>>>
> > > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
> > > sam.kum.work@gmail.com>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> I ran two nodes in my streams compute cluster, they were
> running
> > > fine
> > > >>>>> for
> > > >>>>>>> few hours before outputting with failure to rebalance errors.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> I couldnt understand why this happened but I saw one strange
> > > >>>>> behaviour...
> > > >>>>>>>
> > > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory"
> > > error,
> > > >>>>> this
> > > >>>>>>> might have caused the partitions to relocate and hence the
> error.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> I am attaching detailed logs for both the nodes, please see if
> > you
> > > >> can
> > > >>>>>>> help.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Some of the logs for quick reference are these.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in
> > > thread
> > > >>>>>>> StreamThread-2
> > > >>>>>>>
> > > >>>>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > >>>>>>> [StreamThread-2] Failed to rebalance
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> > > >>>>>>>
> > > >>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
> > > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > >>>>> d.java:488)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> > > >>>>> ad.java:259)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> > > >>>>>>> dinator.java:396)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> > > >>>>>>> Coordinator.java:329)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> > > >>>>>>> Coordinator.java:303)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>>
> > > mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> > > >>>>>>>
> > > >>>>>>>                ... 1 more
> > > >>>>>>>
> > > >>>>>>> Caused by: org.apache.kafka.clients.consumer.
> > CommitFailedException:
> > > >>>>>>> Commit cannot be completed since the group has already
> rebalanced
> > > and
> > > >>>>>>> assigned the partitions to another member. This means that the
> > time
> > > >>>>> between
> > > >>>>>>> subsequent calls to poll() was longer than the configured
> > > >>>>>>> max.poll.interval.ms, which typically implies that the poll
> loop
> > > is
> > > >>>>>>> spending too much time message processing. You can address this
> > > >> either
> > > >>>>> by
> > > >>>>>>> increasing the session timeout or by reducing the maximum size
> of
> > > >>>>> batches
> > > >>>>>>> returned in poll() with max.poll.records.
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> > > >>>>>>> nsumerCoordinator.java:698)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> > > >>>>>>> Coordinator.java:577)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>>
> > > ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>>
> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > >>>>> d.java:480)
> > > >>>>>>>
> > > >>>>>>>                ... 10 more
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create
> > task
> > > >>>>> 1_38.
> > > >>>>>>> Will retry.
> > > >>>>>>>
> > > >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
> > Failed
> > > to
> > > >>>>> lock
> > > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> > > >>>>>>> nager.java:102)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
> > java:834)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> > > >>>>> ead.java:1207)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> > > >>>>>>> koff(StreamThread.java:1180)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
> > java:937)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> > > >>>>> ead.java:236)
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Regards,
> > > >>>>>>>
> > > >>>>>>> -Sameer.
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams Failed to rebalance error

Posted by Guozhang Wang <wa...@gmail.com>.
João,

Do you also have multiple running instances in parallel, and how many
threads are your running within each instance?

Guozhang


On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <jo...@gmail.com>
wrote:

> Eno before I do so I just want to be sure this would not be a duplicate. I
> just found the following issues:
>
> * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being fixed
> on 0.11.0.0/0.10.2.2 (both not released afaik)
> * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in progress
>
> On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <en...@gmail.com>
> wrote:
>
> > Hi there,
> >
> > This might be a bug, would you mind opening a JIRA (copy-pasting below is
> > sufficient).
> >
> > Thanks
> > Eno
> > > On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com> wrote:
> > >
> > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > >
> > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : Could not create task 0_31.
> > Will
> > > retry.
> > >
> > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to
> lock
> > > the state directory for task 0_31
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> AbstractTask.java:73)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:108)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:864)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> createTask(StreamThread.java:1237)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:967)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.access$600(
> StreamThread.java:69)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:234)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:259)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:290)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1029)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > >
> > >
> > > It has been printing it for hours now, so it does not recover at all.
> > > The most worrying thing is that this stream definition does not even
> use
> > > state stores, it literally looks like this:
> > >
> > > KStreamBuilder builder = new KStreamBuilder();
> > >        KStream<byte[], Message> kStream =
> > > builder.stream(appOptions.getInput().getTopic());
> > >        kStream.process(() -> processor);
> > >        new KafkaStreams(builder, streamsConfiguration);
> > >
> > > The "processor" does its thing and calls "context().commit()" when
> done.
> > > That's it. Looking at the actual machine running the instance, the
> > folders
> > > under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> > >
> > > This seems to have been bootstrapped by the exception:
> > >
> > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot be
> > > completed since the group has already rebalanced and assigned the
> > > partitions to another member. This means that the time between
> subsequent
> > > calls to poll() was longer than the configured max.poll.interval.ms,
> > which
> > > typically implies that the poll loop is spending too much time message
> > > processing. You can address this either by increasing the session
> timeout
> > > or by reducing the maximum size of batches returned in poll() with
> > > max.poll.records.
> > >
> > > We are addressing the latter by reducing "max.poll.records" and
> > increasing "
> > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry
> > about
> > > state dirs if there are no state stores? Since it doesn't seem to do so
> > > automatically, can I configured it somehow to achieve this end?
> > >
> > > Additionally, what could lead to it not being able to recover?
> > >
> > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > >> Great! :)
> > >>
> > >> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> > >>> I see now that my Kafka cluster is very stable, and these errors dont
> > >> come
> > >>> now.
> > >>>
> > >>> -Sameer.
> > >>>
> > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sam.kum.work@gmail.com
> >
> > >> wrote:
> > >>>
> > >>>> Yes, I have upgraded my cluster and client both to version 10.2.1
> and
> > >>>> currently monitoring the situation.
> > >>>> Will report back in case I find any errors. Thanks for the help
> > though.
> > >>>>
> > >>>> -Sameer.
> > >>>>
> > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
> > matthias@confluent.io>
> > >>>> wrote:
> > >>>>
> > >>>>> Did you see Eno's reply?
> > >>>>>
> > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed there. If
> > not,
> > >>>>> please report back.
> > >>>>>
> > >>>>> I would also recommend to subscribe to the list. It's self-service
> > >>>>> http://kafka.apache.org/contact
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> > >>>>>> My brokers are on version 10.1.0 and my clients are on version
> > 10.2.0.
> > >>>>>> Also, do a reply to all, I am currently not subscribed to the
> list.
> > >>>>>>
> > >>>>>> -Sameer.
> > >>>>>>
> > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
> > sam.kum.work@gmail.com>
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> I ran two nodes in my streams compute cluster, they were running
> > fine
> > >>>>> for
> > >>>>>>> few hours before outputting with failure to rebalance errors.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> I couldnt understand why this happened but I saw one strange
> > >>>>> behaviour...
> > >>>>>>>
> > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory"
> > error,
> > >>>>> this
> > >>>>>>> might have caused the partitions to relocate and hence the error.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> I am attaching detailed logs for both the nodes, please see if
> you
> > >> can
> > >>>>>>> help.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Some of the logs for quick reference are these.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in
> > thread
> > >>>>>>> StreamThread-2
> > >>>>>>>
> > >>>>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
> > >>>>>>> [StreamThread-2] Failed to rebalance
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> > >>>>>>>
> > >>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
> > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > >>>>> d.java:488)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> > >>>>> ad.java:259)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.clients.consu
> > >>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> > >>>>>>> dinator.java:396)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.clients.consu
> > >>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> > >>>>>>> Coordinator.java:329)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.clients.consu
> > >>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> > >>>>>>> Coordinator.java:303)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.clients.consu
> > >>>>>>>
> > mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.clients.consu
> > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.clients.consu
> > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> > >>>>>>>
> > >>>>>>>                ... 1 more
> > >>>>>>>
> > >>>>>>> Caused by: org.apache.kafka.clients.consumer.
> CommitFailedException:
> > >>>>>>> Commit cannot be completed since the group has already rebalanced
> > and
> > >>>>>>> assigned the partitions to another member. This means that the
> time
> > >>>>> between
> > >>>>>>> subsequent calls to poll() was longer than the configured
> > >>>>>>> max.poll.interval.ms, which typically implies that the poll loop
> > is
> > >>>>>>> spending too much time message processing. You can address this
> > >> either
> > >>>>> by
> > >>>>>>> increasing the session timeout or by reducing the maximum size of
> > >>>>> batches
> > >>>>>>> returned in poll() with max.poll.records.
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.clients.consu
> > >>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> > >>>>>>> nsumerCoordinator.java:698)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.clients.consu
> > >>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> > >>>>>>> Coordinator.java:577)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.clients.consu
> > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>>
> > ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > >>>>> d.java:480)
> > >>>>>>>
> > >>>>>>>                ... 10 more
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create
> task
> > >>>>> 1_38.
> > >>>>>>> Will retry.
> > >>>>>>>
> > >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
> Failed
> > to
> > >>>>> lock
> > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> > >>>>>>> nager.java:102)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
> java:834)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> > >>>>> ead.java:1207)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> > >>>>>>> koff(StreamThread.java:1180)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
> java:937)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> > >>>>>>>
> > >>>>>>>                at org.apache.kafka.streams.proce
> > >>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> > >>>>> ead.java:236)
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>>
> > >>>>>>> -Sameer.
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> >
> >
>



-- 
-- Guozhang

Re: Kafka Streams Failed to rebalance error

Posted by João Peixoto <jo...@gmail.com>.
Eno before I do so I just want to be sure this would not be a duplicate. I
just found the following issues:

* https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being fixed
on 0.11.0.0/0.10.2.2 (both not released afaik)
* https://issues.apache.org/jira/browse/KAFKA-5070. Currently in progress

On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <en...@gmail.com> wrote:

> Hi there,
>
> This might be a bug, would you mind opening a JIRA (copy-pasting below is
> sufficient).
>
> Thanks
> Eno
> > On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com> wrote:
> >
> > I'm using Kafka Streams 0.10.2.1 and I still see this error
> >
> > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > o.a.k.s.p.internals.StreamThread         : Could not create task 0_31.
> Will
> > retry.
> >
> > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock
> > the state directory for task 0_31
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> > [kafka-streams-0.10.2.1.jar!/:na]
> >
> >
> > It has been printing it for hours now, so it does not recover at all.
> > The most worrying thing is that this stream definition does not even use
> > state stores, it literally looks like this:
> >
> > KStreamBuilder builder = new KStreamBuilder();
> >        KStream<byte[], Message> kStream =
> > builder.stream(appOptions.getInput().getTopic());
> >        kStream.process(() -> processor);
> >        new KafkaStreams(builder, streamsConfiguration);
> >
> > The "processor" does its thing and calls "context().commit()" when done.
> > That's it. Looking at the actual machine running the instance, the
> folders
> > under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> >
> > This seems to have been bootstrapped by the exception:
> >
> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> > completed since the group has already rebalanced and assigned the
> > partitions to another member. This means that the time between subsequent
> > calls to poll() was longer than the configured max.poll.interval.ms,
> which
> > typically implies that the poll loop is spending too much time message
> > processing. You can address this either by increasing the session timeout
> > or by reducing the maximum size of batches returned in poll() with
> > max.poll.records.
> >
> > We are addressing the latter by reducing "max.poll.records" and
> increasing "
> > commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry
> about
> > state dirs if there are no state stores? Since it doesn't seem to do so
> > automatically, can I configured it somehow to achieve this end?
> >
> > Additionally, what could lead to it not being able to recover?
> >
> > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Great! :)
> >>
> >> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> >>> I see now that my Kafka cluster is very stable, and these errors dont
> >> come
> >>> now.
> >>>
> >>> -Sameer.
> >>>
> >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sa...@gmail.com>
> >> wrote:
> >>>
> >>>> Yes, I have upgraded my cluster and client both to version 10.2.1 and
> >>>> currently monitoring the situation.
> >>>> Will report back in case I find any errors. Thanks for the help
> though.
> >>>>
> >>>> -Sameer.
> >>>>
> >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
> matthias@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Did you see Eno's reply?
> >>>>>
> >>>>> Please try out Streams 0.10.2.1 -- this should be fixed there. If
> not,
> >>>>> please report back.
> >>>>>
> >>>>> I would also recommend to subscribe to the list. It's self-service
> >>>>> http://kafka.apache.org/contact
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> >>>>>> My brokers are on version 10.1.0 and my clients are on version
> 10.2.0.
> >>>>>> Also, do a reply to all, I am currently not subscribed to the list.
> >>>>>>
> >>>>>> -Sameer.
> >>>>>>
> >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
> sam.kum.work@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> I ran two nodes in my streams compute cluster, they were running
> fine
> >>>>> for
> >>>>>>> few hours before outputting with failure to rebalance errors.
> >>>>>>>
> >>>>>>>
> >>>>>>> I couldnt understand why this happened but I saw one strange
> >>>>> behaviour...
> >>>>>>>
> >>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory"
> error,
> >>>>> this
> >>>>>>> might have caused the partitions to relocate and hence the error.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> I am attaching detailed logs for both the nodes, please see if you
> >> can
> >>>>>>> help.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Some of the logs for quick reference are these.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in
> thread
> >>>>>>> StreamThread-2
> >>>>>>>
> >>>>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
> >>>>>>> [StreamThread-2] Failed to rebalance
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> >>>>>>>
> >>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
> >>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> >>>>> d.java:488)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> >>>>> ad.java:259)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.clients.consu
> >>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> >>>>>>> dinator.java:396)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.clients.consu
> >>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> >>>>>>> Coordinator.java:329)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.clients.consu
> >>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> >>>>>>> Coordinator.java:303)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.clients.consu
> >>>>>>>
> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.clients.consu
> >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.clients.consu
> >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> >>>>>>>
> >>>>>>>                ... 1 more
> >>>>>>>
> >>>>>>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
> >>>>>>> Commit cannot be completed since the group has already rebalanced
> and
> >>>>>>> assigned the partitions to another member. This means that the time
> >>>>> between
> >>>>>>> subsequent calls to poll() was longer than the configured
> >>>>>>> max.poll.interval.ms, which typically implies that the poll loop
> is
> >>>>>>> spending too much time message processing. You can address this
> >> either
> >>>>> by
> >>>>>>> increasing the session timeout or by reducing the maximum size of
> >>>>> batches
> >>>>>>> returned in poll() with max.poll.records.
> >>>>>>>
> >>>>>>>                at org.apache.kafka.clients.consu
> >>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> >>>>>>> nsumerCoordinator.java:698)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.clients.consu
> >>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> >>>>>>> Coordinator.java:577)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.clients.consu
> >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>>
> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> >>>>> d.java:480)
> >>>>>>>
> >>>>>>>                ... 10 more
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task
> >>>>> 1_38.
> >>>>>>> Will retry.
> >>>>>>>
> >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38] Failed
> to
> >>>>> lock
> >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> >>>>>>> nager.java:102)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> >>>>> ead.java:1207)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> >>>>>>> koff(StreamThread.java:1180)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> >>>>>>>
> >>>>>>>                at org.apache.kafka.streams.proce
> >>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> >>>>> ead.java:236)
> >>>>>>>
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>>
> >>>>>>> -Sameer.
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
>
>

Re: Kafka Streams Failed to rebalance error

Posted by Eno Thereska <en...@gmail.com>.
Hi there,

This might be a bug, would you mind opening a JIRA (copy-pasting below is sufficient).

Thanks
Eno
> On 7 Jun 2017, at 21:38, João Peixoto <jo...@gmail.com> wrote:
> 
> I'm using Kafka Streams 0.10.2.1 and I still see this error
> 
> 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : Could not create task 0_31. Will
> retry.
> 
> org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock
> the state directory for task 0_31
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> [kafka-streams-0.10.2.1.jar!/:na]
> 
> 
> It has been printing it for hours now, so it does not recover at all.
> The most worrying thing is that this stream definition does not even use
> state stores, it literally looks like this:
> 
> KStreamBuilder builder = new KStreamBuilder();
>        KStream<byte[], Message> kStream =
> builder.stream(appOptions.getInput().getTopic());
>        kStream.process(() -> processor);
>        new KafkaStreams(builder, streamsConfiguration);
> 
> The "processor" does its thing and calls "context().commit()" when done.
> That's it. Looking at the actual machine running the instance, the folders
> under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> 
> This seems to have been bootstrapped by the exception:
> 
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms, which
> typically implies that the poll loop is spending too much time message
> processing. You can address this either by increasing the session timeout
> or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
> 
> We are addressing the latter by reducing "max.poll.records" and increasing "
> commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry about
> state dirs if there are no state stores? Since it doesn't seem to do so
> automatically, can I configured it somehow to achieve this end?
> 
> Additionally, what could lead to it not being able to recover?
> 
> On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Great! :)
>> 
>> On 5/16/17 2:31 AM, Sameer Kumar wrote:
>>> I see now that my Kafka cluster is very stable, and these errors dont
>> come
>>> now.
>>> 
>>> -Sameer.
>>> 
>>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sa...@gmail.com>
>> wrote:
>>> 
>>>> Yes, I have upgraded my cluster and client both to version 10.2.1 and
>>>> currently monitoring the situation.
>>>> Will report back in case I find any errors. Thanks for the help though.
>>>> 
>>>> -Sameer.
>>>> 
>>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <ma...@confluent.io>
>>>> wrote:
>>>> 
>>>>> Did you see Eno's reply?
>>>>> 
>>>>> Please try out Streams 0.10.2.1 -- this should be fixed there. If not,
>>>>> please report back.
>>>>> 
>>>>> I would also recommend to subscribe to the list. It's self-service
>>>>> http://kafka.apache.org/contact
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
>>>>>> My brokers are on version 10.1.0 and my clients are on version 10.2.0.
>>>>>> Also, do a reply to all, I am currently not subscribed to the list.
>>>>>> 
>>>>>> -Sameer.
>>>>>> 
>>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sa...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> I ran two nodes in my streams compute cluster, they were running fine
>>>>> for
>>>>>>> few hours before outputting with failure to rebalance errors.
>>>>>>> 
>>>>>>> 
>>>>>>> I couldnt understand why this happened but I saw one strange
>>>>> behaviour...
>>>>>>> 
>>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory" error,
>>>>> this
>>>>>>> might have caused the partitions to relocate and hence the error.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> I am attaching detailed logs for both the nodes, please see if you
>> can
>>>>>>> help.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Some of the logs for quick reference are these.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread
>>>>>>> StreamThread-2
>>>>>>> 
>>>>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
>>>>>>> [StreamThread-2] Failed to rebalance
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
>>>>>>> 
>>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
>>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>>>> d.java:488)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
>>>>> ad.java:259)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
>>>>>>> dinator.java:396)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
>>>>>>> Coordinator.java:329)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
>>>>>>> Coordinator.java:303)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>>>>>>> 
>>>>>>>                ... 1 more
>>>>>>> 
>>>>>>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
>>>>>>> Commit cannot be completed since the group has already rebalanced and
>>>>>>> assigned the partitions to another member. This means that the time
>>>>> between
>>>>>>> subsequent calls to poll() was longer than the configured
>>>>>>> max.poll.interval.ms, which typically implies that the poll loop is
>>>>>>> spending too much time message processing. You can address this
>> either
>>>>> by
>>>>>>> increasing the session timeout or by reducing the maximum size of
>>>>> batches
>>>>>>> returned in poll() with max.poll.records.
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
>>>>>>> nsumerCoordinator.java:698)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
>>>>>>> Coordinator.java:577)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>>>> d.java:480)
>>>>>>> 
>>>>>>>                ... 10 more
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task
>>>>> 1_38.
>>>>>>> Will retry.
>>>>>>> 
>>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to
>>>>> lock
>>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
>>>>>>> nager.java:102)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
>>>>> ead.java:1207)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
>>>>>>> koff(StreamThread.java:1180)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
>>>>> ead.java:236)
>>>>>>> 
>>>>>>> 
>>>>>>> Regards,
>>>>>>> 
>>>>>>> -Sameer.
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 


Re: Kafka Streams Failed to rebalance error

Posted by João Peixoto <jo...@gmail.com>.
I'm using Kafka Streams 0.10.2.1 and I still see this error

2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : Could not create task 0_31. Will
retry.

org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock
the state directory for task 0_31
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[kafka-streams-0.10.2.1.jar!/:na]


It has been printing it for hours now, so it does not recover at all.
The most worrying thing is that this stream definition does not even use
state stores, it literally looks like this:

 KStreamBuilder builder = new KStreamBuilder();
        KStream<byte[], Message> kStream =
builder.stream(appOptions.getInput().getTopic());
        kStream.process(() -> processor);
        new KafkaStreams(builder, streamsConfiguration);

The "processor" does its thing and calls "context().commit()" when done.
That's it. Looking at the actual machine running the instance, the folders
under /tmp/kafka-streams/<stream name>/ only have a .lock file.

This seems to have been bootstrapped by the exception:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.

We are addressing the latter by reducing "max.poll.records" and increasing "
commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry about
state dirs if there are no state stores? Since it doesn't seem to do so
automatically, can I configured it somehow to achieve this end?

Additionally, what could lead to it not being able to recover?

On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Great! :)
>
> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> > I see now that my Kafka cluster is very stable, and these errors dont
> come
> > now.
> >
> > -Sameer.
> >
> > On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sa...@gmail.com>
> wrote:
> >
> >> Yes, I have upgraded my cluster and client both to version 10.2.1 and
> >> currently monitoring the situation.
> >> Will report back in case I find any errors. Thanks for the help though.
> >>
> >> -Sameer.
> >>
> >> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <ma...@confluent.io>
> >> wrote:
> >>
> >>> Did you see Eno's reply?
> >>>
> >>> Please try out Streams 0.10.2.1 -- this should be fixed there. If not,
> >>> please report back.
> >>>
> >>> I would also recommend to subscribe to the list. It's self-service
> >>> http://kafka.apache.org/contact
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> >>>> My brokers are on version 10.1.0 and my clients are on version 10.2.0.
> >>>> Also, do a reply to all, I am currently not subscribed to the list.
> >>>>
> >>>> -Sameer.
> >>>>
> >>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sa...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>>
> >>>>>
> >>>>> I ran two nodes in my streams compute cluster, they were running fine
> >>> for
> >>>>> few hours before outputting with failure to rebalance errors.
> >>>>>
> >>>>>
> >>>>> I couldnt understand why this happened but I saw one strange
> >>> behaviour...
> >>>>>
> >>>>> at 16:53 on node1, I saw "Failed to lock the state directory" error,
> >>> this
> >>>>> might have caused the partitions to relocate and hence the error.
> >>>>>
> >>>>>
> >>>>>
> >>>>> I am attaching detailed logs for both the nodes, please see if you
> can
> >>>>> help.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Some of the logs for quick reference are these.
> >>>>>
> >>>>>
> >>>>>
> >>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread
> >>>>> StreamThread-2
> >>>>>
> >>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
> >>>>> [StreamThread-2] Failed to rebalance
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> >>>>>
> >>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
> >>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> >>> d.java:488)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> >>> ad.java:259)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> >>>>> dinator.java:396)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> >>>>> Coordinator.java:329)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> >>>>> Coordinator.java:303)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> >>>>>
> >>>>>                 ... 1 more
> >>>>>
> >>>>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
> >>>>> Commit cannot be completed since the group has already rebalanced and
> >>>>> assigned the partitions to another member. This means that the time
> >>> between
> >>>>> subsequent calls to poll() was longer than the configured
> >>>>> max.poll.interval.ms, which typically implies that the poll loop is
> >>>>> spending too much time message processing. You can address this
> either
> >>> by
> >>>>> increasing the session timeout or by reducing the maximum size of
> >>> batches
> >>>>> returned in poll() with max.poll.records.
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> >>>>> nsumerCoordinator.java:698)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> >>>>> Coordinator.java:577)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> >>> d.java:480)
> >>>>>
> >>>>>                 ... 10 more
> >>>>>
> >>>>>
> >>>>>
> >>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task
> >>> 1_38.
> >>>>> Will retry.
> >>>>>
> >>>>> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to
> >>> lock
> >>>>> the state directory: /data/streampoc/LIC2-5/1_38
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> >>>>> nager.java:102)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> >>> ead.java:1207)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> >>>>> koff(StreamThread.java:1180)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> >>> ead.java:236)
> >>>>>
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> -Sameer.
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Re: Kafka Streams Failed to rebalance error

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Great! :)

On 5/16/17 2:31 AM, Sameer Kumar wrote:
> I see now that my Kafka cluster is very stable, and these errors dont come
> now.
> 
> -Sameer.
> 
> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sa...@gmail.com> wrote:
> 
>> Yes, I have upgraded my cluster and client both to version 10.2.1 and
>> currently monitoring the situation.
>> Will report back in case I find any errors. Thanks for the help though.
>>
>> -Sameer.
>>
>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Did you see Eno's reply?
>>>
>>> Please try out Streams 0.10.2.1 -- this should be fixed there. If not,
>>> please report back.
>>>
>>> I would also recommend to subscribe to the list. It's self-service
>>> http://kafka.apache.org/contact
>>>
>>>
>>> -Matthias
>>>
>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
>>>> My brokers are on version 10.1.0 and my clients are on version 10.2.0.
>>>> Also, do a reply to all, I am currently not subscribed to the list.
>>>>
>>>> -Sameer.
>>>>
>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sa...@gmail.com>
>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> I ran two nodes in my streams compute cluster, they were running fine
>>> for
>>>>> few hours before outputting with failure to rebalance errors.
>>>>>
>>>>>
>>>>> I couldnt understand why this happened but I saw one strange
>>> behaviour...
>>>>>
>>>>> at 16:53 on node1, I saw "Failed to lock the state directory" error,
>>> this
>>>>> might have caused the partitions to relocate and hence the error.
>>>>>
>>>>>
>>>>>
>>>>> I am attaching detailed logs for both the nodes, please see if you can
>>>>> help.
>>>>>
>>>>>
>>>>>
>>>>> Some of the logs for quick reference are these.
>>>>>
>>>>>
>>>>>
>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread
>>>>> StreamThread-2
>>>>>
>>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
>>>>> [StreamThread-2] Failed to rebalance
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
>>>>>
>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>> d.java:488)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
>>> ad.java:259)
>>>>>
>>>>>                 at org.apache.kafka.clients.consu
>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
>>>>> dinator.java:396)
>>>>>
>>>>>                 at org.apache.kafka.clients.consu
>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
>>>>> Coordinator.java:329)
>>>>>
>>>>>                 at org.apache.kafka.clients.consu
>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
>>>>> Coordinator.java:303)
>>>>>
>>>>>                 at org.apache.kafka.clients.consu
>>>>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>>>>
>>>>>                 at org.apache.kafka.clients.consu
>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>>>>
>>>>>                 at org.apache.kafka.clients.consu
>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>>>>>
>>>>>                 ... 1 more
>>>>>
>>>>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
>>>>> Commit cannot be completed since the group has already rebalanced and
>>>>> assigned the partitions to another member. This means that the time
>>> between
>>>>> subsequent calls to poll() was longer than the configured
>>>>> max.poll.interval.ms, which typically implies that the poll loop is
>>>>> spending too much time message processing. You can address this either
>>> by
>>>>> increasing the session timeout or by reducing the maximum size of
>>> batches
>>>>> returned in poll() with max.poll.records.
>>>>>
>>>>>                 at org.apache.kafka.clients.consu
>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
>>>>> nsumerCoordinator.java:698)
>>>>>
>>>>>                 at org.apache.kafka.clients.consu
>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
>>>>> Coordinator.java:577)
>>>>>
>>>>>                 at org.apache.kafka.clients.consu
>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>> d.java:480)
>>>>>
>>>>>                 ... 10 more
>>>>>
>>>>>
>>>>>
>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task
>>> 1_38.
>>>>> Will retry.
>>>>>
>>>>> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to
>>> lock
>>>>> the state directory: /data/streampoc/LIC2-5/1_38
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
>>>>> nager.java:102)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
>>> ead.java:1207)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
>>>>> koff(StreamThread.java:1180)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>>>>>
>>>>>                 at org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
>>> ead.java:236)
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> -Sameer.
>>>>>
>>>>
>>>
>>>
>>
> 


Re: Kafka Streams Failed to rebalance error

Posted by Sameer Kumar <sa...@gmail.com>.
I see now that my Kafka cluster is very stable, and these errors dont come
now.

-Sameer.

On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sa...@gmail.com> wrote:

> Yes, I have upgraded my cluster and client both to version 10.2.1 and
> currently monitoring the situation.
> Will report back in case I find any errors. Thanks for the help though.
>
> -Sameer.
>
> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> Did you see Eno's reply?
>>
>> Please try out Streams 0.10.2.1 -- this should be fixed there. If not,
>> please report back.
>>
>> I would also recommend to subscribe to the list. It's self-service
>> http://kafka.apache.org/contact
>>
>>
>> -Matthias
>>
>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
>> > My brokers are on version 10.1.0 and my clients are on version 10.2.0.
>> > Also, do a reply to all, I am currently not subscribed to the list.
>> >
>> > -Sameer.
>> >
>> > On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sa...@gmail.com>
>> wrote:
>> >
>> >> Hi,
>> >>
>> >>
>> >>
>> >> I ran two nodes in my streams compute cluster, they were running fine
>> for
>> >> few hours before outputting with failure to rebalance errors.
>> >>
>> >>
>> >> I couldnt understand why this happened but I saw one strange
>> behaviour...
>> >>
>> >> at 16:53 on node1, I saw "Failed to lock the state directory" error,
>> this
>> >> might have caused the partitions to relocate and hence the error.
>> >>
>> >>
>> >>
>> >> I am attaching detailed logs for both the nodes, please see if you can
>> >> help.
>> >>
>> >>
>> >>
>> >> Some of the logs for quick reference are these.
>> >>
>> >>
>> >>
>> >> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread
>> >> StreamThread-2
>> >>
>> >> org.apache.kafka.streams.errors.StreamsException: stream-thread
>> >> [StreamThread-2] Failed to rebalance
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.run(StreamThread.java:368)
>> >>
>> >> Caused by: org.apache.kafka.streams.errors.StreamsException:
>> >> stream-thread [StreamThread-2] failed to suspend stream tasks
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>> d.java:488)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
>> ad.java:259)
>> >>
>> >>                 at org.apache.kafka.clients.consu
>> >> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
>> >> dinator.java:396)
>> >>
>> >>                 at org.apache.kafka.clients.consu
>> >> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
>> >> Coordinator.java:329)
>> >>
>> >>                 at org.apache.kafka.clients.consu
>> >> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
>> >> Coordinator.java:303)
>> >>
>> >>                 at org.apache.kafka.clients.consu
>> >> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>> >>
>> >>                 at org.apache.kafka.clients.consu
>> >> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>> >>
>> >>                 at org.apache.kafka.clients.consu
>> >> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>> >>
>> >>                 ... 1 more
>> >>
>> >> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
>> >> Commit cannot be completed since the group has already rebalanced and
>> >> assigned the partitions to another member. This means that the time
>> between
>> >> subsequent calls to poll() was longer than the configured
>> >> max.poll.interval.ms, which typically implies that the poll loop is
>> >> spending too much time message processing. You can address this either
>> by
>> >> increasing the session timeout or by reducing the maximum size of
>> batches
>> >> returned in poll() with max.poll.records.
>> >>
>> >>                 at org.apache.kafka.clients.consu
>> >> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
>> >> nsumerCoordinator.java:698)
>> >>
>> >>                 at org.apache.kafka.clients.consu
>> >> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
>> >> Coordinator.java:577)
>> >>
>> >>                 at org.apache.kafka.clients.consu
>> >> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>> d.java:480)
>> >>
>> >>                 ... 10 more
>> >>
>> >>
>> >>
>> >> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task
>> 1_38.
>> >> Will retry.
>> >>
>> >> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to
>> lock
>> >> the state directory: /data/streampoc/LIC2-5/1_38
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
>> >> nager.java:102)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
>> ead.java:1207)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
>> >> koff(StreamThread.java:1180)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>> >>
>> >>                 at org.apache.kafka.streams.proce
>> >> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
>> ead.java:236)
>> >>
>> >>
>> >> Regards,
>> >>
>> >> -Sameer.
>> >>
>> >
>>
>>
>

Re: Kafka Streams Failed to rebalance error

Posted by Sameer Kumar <sa...@gmail.com>.
Yes, I have upgraded my cluster and client both to version 10.2.1 and
currently monitoring the situation.
Will report back in case I find any errors. Thanks for the help though.

-Sameer.

On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Did you see Eno's reply?
>
> Please try out Streams 0.10.2.1 -- this should be fixed there. If not,
> please report back.
>
> I would also recommend to subscribe to the list. It's self-service
> http://kafka.apache.org/contact
>
>
> -Matthias
>
> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> > My brokers are on version 10.1.0 and my clients are on version 10.2.0.
> > Also, do a reply to all, I am currently not subscribed to the list.
> >
> > -Sameer.
> >
> > On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sa...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >>
> >>
> >> I ran two nodes in my streams compute cluster, they were running fine
> for
> >> few hours before outputting with failure to rebalance errors.
> >>
> >>
> >> I couldnt understand why this happened but I saw one strange
> behaviour...
> >>
> >> at 16:53 on node1, I saw "Failed to lock the state directory" error,
> this
> >> might have caused the partitions to relocate and hence the error.
> >>
> >>
> >>
> >> I am attaching detailed logs for both the nodes, please see if you can
> >> help.
> >>
> >>
> >>
> >> Some of the logs for quick reference are these.
> >>
> >>
> >>
> >> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread
> >> StreamThread-2
> >>
> >> org.apache.kafka.streams.errors.StreamsException: stream-thread
> >> [StreamThread-2] Failed to rebalance
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.run(StreamThread.java:368)
> >>
> >> Caused by: org.apache.kafka.streams.errors.StreamsException:
> >> stream-thread [StreamThread-2] failed to suspend stream tasks
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread$1.onPartitionsRevoked(
> StreamThread.java:259)
> >>
> >>                 at org.apache.kafka.clients.consu
> >> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> >> dinator.java:396)
> >>
> >>                 at org.apache.kafka.clients.consu
> >> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> >> Coordinator.java:329)
> >>
> >>                 at org.apache.kafka.clients.consu
> >> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> >> Coordinator.java:303)
> >>
> >>                 at org.apache.kafka.clients.consu
> >> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> >>
> >>                 at org.apache.kafka.clients.consu
> >> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> >>
> >>                 at org.apache.kafka.clients.consu
> >> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> >>
> >>                 ... 1 more
> >>
> >> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
> >> Commit cannot be completed since the group has already rebalanced and
> >> assigned the partitions to another member. This means that the time
> between
> >> subsequent calls to poll() was longer than the configured
> >> max.poll.interval.ms, which typically implies that the poll loop is
> >> spending too much time message processing. You can address this either
> by
> >> increasing the session timeout or by reducing the maximum size of
> batches
> >> returned in poll() with max.poll.records.
> >>
> >>                 at org.apache.kafka.clients.consu
> >> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> >> nsumerCoordinator.java:698)
> >>
> >>                 at org.apache.kafka.clients.consu
> >> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> >> Coordinator.java:577)
> >>
> >>                 at org.apache.kafka.clients.consu
> >> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
> >>
> >>                 ... 10 more
> >>
> >>
> >>
> >> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task
> 1_38.
> >> Will retry.
> >>
> >> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to
> lock
> >> the state directory: /data/streampoc/LIC2-5/1_38
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> >> nager.java:102)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread$TaskCreator.createTask(
> StreamThread.java:1207)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> >> koff(StreamThread.java:1180)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> >>
> >>                 at org.apache.kafka.streams.proce
> >> ssor.internals.StreamThread$1.onPartitionsAssigned(
> StreamThread.java:236)
> >>
> >>
> >> Regards,
> >>
> >> -Sameer.
> >>
> >
>
>

Re: Kafka Streams Failed to rebalance error

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Did you see Eno's reply?

Please try out Streams 0.10.2.1 -- this should be fixed there. If not,
please report back.

I would also recommend to subscribe to the list. It's self-service
http://kafka.apache.org/contact


-Matthias

On 5/3/17 10:49 PM, Sameer Kumar wrote:
> My brokers are on version 10.1.0 and my clients are on version 10.2.0.
> Also, do a reply to all, I am currently not subscribed to the list.
> 
> -Sameer.
> 
> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sa...@gmail.com> wrote:
> 
>> Hi,
>>
>>
>>
>> I ran two nodes in my streams compute cluster, they were running fine for
>> few hours before outputting with failure to rebalance errors.
>>
>>
>> I couldnt understand why this happened but I saw one strange behaviour...
>>
>> at 16:53 on node1, I saw "Failed to lock the state directory" error, this
>> might have caused the partitions to relocate and hence the error.
>>
>>
>>
>> I am attaching detailed logs for both the nodes, please see if you can
>> help.
>>
>>
>>
>> Some of the logs for quick reference are these.
>>
>>
>>
>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread
>> StreamThread-2
>>
>> org.apache.kafka.streams.errors.StreamsException: stream-thread
>> [StreamThread-2] Failed to rebalance
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.run(StreamThread.java:368)
>>
>> Caused by: org.apache.kafka.streams.errors.StreamsException:
>> stream-thread [StreamThread-2] failed to suspend stream tasks
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
>>
>>                 at org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
>> dinator.java:396)
>>
>>                 at org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
>> Coordinator.java:329)
>>
>>                 at org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
>> Coordinator.java:303)
>>
>>                 at org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>
>>                 at org.apache.kafka.clients.consu
>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>
>>                 at org.apache.kafka.clients.consu
>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>>
>>                 ... 1 more
>>
>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
>> Commit cannot be completed since the group has already rebalanced and
>> assigned the partitions to another member. This means that the time between
>> subsequent calls to poll() was longer than the configured
>> max.poll.interval.ms, which typically implies that the poll loop is
>> spending too much time message processing. You can address this either by
>> increasing the session timeout or by reducing the maximum size of batches
>> returned in poll() with max.poll.records.
>>
>>                 at org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
>> nsumerCoordinator.java:698)
>>
>>                 at org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
>> Coordinator.java:577)
>>
>>                 at org.apache.kafka.clients.consu
>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
>>
>>                 ... 10 more
>>
>>
>>
>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task 1_38.
>> Will retry.
>>
>> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to lock
>> the state directory: /data/streampoc/LIC2-5/1_38
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
>> nager.java:102)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
>> koff(StreamThread.java:1180)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>>
>>                 at org.apache.kafka.streams.proce
>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>>
>>
>> Regards,
>>
>> -Sameer.
>>
> 


Re: Kafka Streams Failed to rebalance error

Posted by Sameer Kumar <sa...@gmail.com>.
My brokers are on version 10.1.0 and my clients are on version 10.2.0.
Also, do a reply to all, I am currently not subscribed to the list.

-Sameer.

On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sa...@gmail.com> wrote:

> Hi,
>
>
>
> I ran two nodes in my streams compute cluster, they were running fine for
> few hours before outputting with failure to rebalance errors.
>
>
> I couldnt understand why this happened but I saw one strange behaviour...
>
> at 16:53 on node1, I saw "Failed to lock the state directory" error, this
> might have caused the partitions to relocate and hence the error.
>
>
>
> I am attaching detailed logs for both the nodes, please see if you can
> help.
>
>
>
> Some of the logs for quick reference are these.
>
>
>
> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread
> StreamThread-2
>
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-2] Failed to rebalance
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.run(StreamThread.java:368)
>
> Caused by: org.apache.kafka.streams.errors.StreamsException:
> stream-thread [StreamThread-2] failed to suspend stream tasks
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
>
>                 at org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> dinator.java:396)
>
>                 at org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> Coordinator.java:329)
>
>                 at org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> Coordinator.java:303)
>
>                 at org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>
>                 at org.apache.kafka.clients.consu
> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>
>                 at org.apache.kafka.clients.consu
> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>
>                 ... 1 more
>
> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
> Commit cannot be completed since the group has already rebalanced and
> assigned the partitions to another member. This means that the time between
> subsequent calls to poll() was longer than the configured
> max.poll.interval.ms, which typically implies that the poll loop is
> spending too much time message processing. You can address this either by
> increasing the session timeout or by reducing the maximum size of batches
> returned in poll() with max.poll.records.
>
>                 at org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> nsumerCoordinator.java:698)
>
>                 at org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> Coordinator.java:577)
>
>                 at org.apache.kafka.clients.consu
> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
>
>                 ... 10 more
>
>
>
> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task 1_38.
> Will retry.
>
> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to lock
> the state directory: /data/streampoc/LIC2-5/1_38
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> nager.java:102)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> koff(StreamThread.java:1180)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>
>                 at org.apache.kafka.streams.proce
> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>
>
> Regards,
>
> -Sameer.
>

Re: Kafka Streams Failed to rebalance error

Posted by Eno Thereska <en...@gmail.com>.
Hi,

Which version of Kafka are you using? This should be fixed in 0.10.2.1, any chance you could try that release?

Thanks
Eno
> On 3 May 2017, at 14:04, Sameer Kumar <sa...@gmail.com> wrote:
> 
> Hi,
> 
>  
> I ran two nodes in my streams compute cluster, they were running fine for few hours before outputting with failure to rebalance errors.
> 
> 
> 
> I couldnt understand why this happened but I saw one strange behaviour...
> 
> at 16:53 on node1, I saw "Failed to lock the state directory" error, this might have caused the partitions to relocate and hence the error.
> 
>  
> I am attaching detailed logs for both the nodes, please see if you can help.
> 
>  
> Some of the logs for quick reference are these.
> 
>  
> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread StreamThread-2
> 
> org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-2] Failed to rebalance
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> 
> Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-2] failed to suspend stream tasks
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
> 
>                 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
> 
>                 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
> 
>                 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> 
>                 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> 
>                 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> 
>                 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> 
>                 ... 1 more
> 
> Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms <http://max.poll.interval.ms/>, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
> 
>                 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
> 
>                 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
> 
>                 at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
> 
>                 ... 10 more
> 
>  
> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task 1_38. Will retry.
> 
> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to lock the state directory: /data/streampoc/LIC2-5/1_38
> 
>                 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> 
>                 at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> 
>                 at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> 
> 
> 
> Regards,
> 
> -Sameer.
> 
> <node2.zip><node1.zip>