You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tianji Li <sk...@gmail.com> on 2017/03/16 20:47:20 UTC

Kafka Streams: lockException

Hi there,

I always got this crashes and wonder if anyone knows why. Please let me
know what information I should provide to help with trouble shooting.

I am using 0.10.2.0. My application is reading one topic and then
groupBy().aggregate() 50 times on different keys.

I use memory store, without backing to kafka.

Thanks
Tianji


2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
o.a.k.s.p.internals.StreamThread         : Could not create task 0_4. Will
retry.

org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock
the state directory: /tmp/kafka-streams/xxx-test28/0_4
        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
        at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
        at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
        at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
        at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
        at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
        at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

Re: Kafka Streams: lockException

Posted by Tianji Li <sk...@gmail.com>.
Hi Guys,

Great information again as usual, very helpful!

Very appreciated, thanks so much!

Tianji

PS: The Kafka Community is simply great!



On Fri, Mar 17, 2017 at 3:00 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Tianji and Sachin (and also cc'ing people who I remember have reported
> similar RocksDB memory issues),
>
> Sharing my experience with RocksDB tuning and also chatting with the
> RocksDB community:
>
> 1. If you are frequently flushing the state stores (e.g. with high commit
> frequency) then you will end up with huge number of very small memtable
> files, and hence result in very high compaction pressure on RocksDB; if you
> use default number of compaction threads (1) it will not be able to catch
> up with the write throughput and compaction rate, and hence the gradual
> degradation of performance. We have changed the default
> num.compaction.threads in trunk but if you are under released version
> 0.10.2 or older, check your store's flush rate metrics and consider
> increasing the compaction threads.
>
> 2. The most common memory leaks from RocksDB JNI are iterator leaks. Make
> sure to close the iterator return for your range queries / fetches from the
> stores when you are done. If not the corresponding scanned memory will be
> pinned in memory and cannot be compacted.
>
>
> Guozhang
>
>
> On Fri, Mar 17, 2017 at 8:56 AM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Sachin, you also have a PR for this that could help, right?:
> > https://github.com/apache/kafka/pull/2642#issuecomment-287372367 <
> > https://github.com/apache/kafka/pull/2642#issuecomment-287372367>.
> >
> > Thanks
> > Eno
> >
> >
> > > On 17 Mar 2017, at 15:19, Sachin Mittal <sj...@gmail.com> wrote:
> > >
> > > We also face same issues.
> > > What we have found is that rocksdb is the issue. With many instances of
> > > rocksdb per machine, over the time it slows down due to i/o operations,
> > > resulting in threads getting evicted because max.poll.interval exceeds
> > the
> > > set limit.
> > >
> > > Try running rocksdb in memory https://github.com/facebook/
> > > rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li <sk...@gmail.com> wrote:
> > >
> > >> Hi Eno,
> > >>
> > >> I used 150, 50, 20 threads and the probabilities of crashing decreased
> > with
> > >> this number. When using 1 thread, no crash!
> > >>
> > >> My max.poll.interval is 5 minutes and all the processing won't last
> that
> > >> long, so that parameter does not help.
> > >>
> > >>
> > >> Thanks
> > >> Tianji
> > >>
> > >> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <eno.thereska@gmail.com
> >
> > >> wrote:
> > >>
> > >>> Hi Tianji,
> > >>>
> > >>> How many threads does your app use?
> > >>>
> > >>> One reason is explained here: https://groups.google.com/
> > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g <
> > https://groups.google.com/
> > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
> > >> increase
> > >>> max.poll.interval config value.
> > >>> If that doesn't work, could you revert to using one thread for now.
> > Also
> > >>> let us know either way since we might need to open a bug report.
> > >>>
> > >>> Thanks
> > >>> Eno
> > >>>
> > >>>> On 16 Mar 2017, at 20:47, Tianji Li <sk...@gmail.com> wrote:
> > >>>>
> > >>>> Hi there,
> > >>>>
> > >>>> I always got this crashes and wonder if anyone knows why. Please let
> > me
> > >>>> know what information I should provide to help with trouble
> shooting.
> > >>>>
> > >>>> I am using 0.10.2.0. My application is reading one topic and then
> > >>>> groupBy().aggregate() 50 times on different keys.
> > >>>>
> > >>>> I use memory store, without backing to kafka.
> > >>>>
> > >>>> Thanks
> > >>>> Tianji
> > >>>>
> > >>>>
> > >>>> 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> > >>>> o.a.k.s.p.internals.StreamThread         : Could not create task
> 0_4.
> > >>> Will
> > >>>> retry.
> > >>>>
> > >>>> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
> > >> lock
> > >>>> the state directory: /tmp/kafka-streams/xxx-test28/0_4
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> > >>> AbstractTask.java:73)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> StreamTask.<init>(StreamTask.java:108)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> StreamThread.createStreamTask(StreamThread.java:834)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.StreamThread$Ta
> > skCreator.
> > >>> createTask(StreamThread.java:1207)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> StreamThread.addStreamTasks(StreamThread.java:937)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> StreamThread.access$500(
> > >>> StreamThread.java:69)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.StreamThread$1.
> > >>> onPartitionsAssigned(StreamThread.java:236)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > >>> onJoinComplete(ConsumerCoordinator.java:255)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >>> joinGroupIfNeeded(AbstractCoordinator.java:339)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >>> ensureActiveGroup(AbstractCoordinator.java:303)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > >>> ConsumerCoordinator.java:286)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > >>> pollOnce(KafkaConsumer.java:1030)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > >>> KafkaConsumer.java:995)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >>> StreamThread.java:582)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> StreamThread.run(StreamThread.java:368)
> > >>>
> > >>>
> > >>
> >
> >
>
>
> --
> -- Guozhang
>

Re: Kafka Streams: lockException

Posted by Damian Guy <da...@gmail.com>.
Mahendra,
The WAL is turned off in KafkaStreams. This file is just the rocksdb log,
you can probably just delete the old ones:
https://github.com/facebook/rocksdb/issues/849
In 0.10.0.1 there is no way of configuring RocksDB via KafkaStreams.

Thanks,
Damian

On Mon, 20 Mar 2017 at 09:22 Mahendra Kariya <ma...@go-jek.com>
wrote:

> We did some more analysis on why the disk utilisation is continuously
> increasing. Turns out it's the RocksDB WAL that's utilising most of the
> disk space. The LOG.old WAL files are not getting deleted. Ideally they
> should have been. RocksDB provides certain configuration for purging WAL
> files
> <
> https://github.com/facebook/rocksdb/wiki/basic-operations#purging-wal-files
> >.
> But I am not sure how to set these configs. Any help would be really
> appreciated. Just for reference, our Kafka brokers are on v0.10.0.1 and
> RocksDB version is 4.8.0.
>
>
>
>
> On Mon, Mar 20, 2017 at 12:29 PM, Mahendra Kariya <
> mahendra.kariya@go-jek.com> wrote:
>
> > Hey Guozhang,
> >
> > Thanks a lot for these insights. We are facing the exact same problem as
> > Tianji. Our commit frequency is also quite high. We flush almost around
> 16K
> > messages per minute to Kafka at the end of the topology.
> >
> > Another issue that we are facing is that rocksdb is not deleting old
> data.
> > We have set the time window retention duration to 1 hour, but the disk
> size
> > is constantly increasing. Ideally, the disk utilisation graph should
> > plateau after some time.
> >
> > We would like to apply the config change suggestions that you have given.
> > But we are on Kafka 0.10.0.1. And from the docs, it seems
> > rocksdb.config.setter is not available for this version. Is there any
> > other way for us to configure rocksdb?
> >
> > Does Kafka 0.10.0.1 emit any rocksdb related metrics that we can monitor
> > and set up alerts on?
> >
> >
> > Thanks!
> >
> >
> >
>

Re: Kafka Streams: lockException

Posted by Mahendra Kariya <ma...@go-jek.com>.
We did some more analysis on why the disk utilisation is continuously
increasing. Turns out it's the RocksDB WAL that's utilising most of the
disk space. The LOG.old WAL files are not getting deleted. Ideally they
should have been. RocksDB provides certain configuration for purging WAL
files
<https://github.com/facebook/rocksdb/wiki/basic-operations#purging-wal-files>.
But I am not sure how to set these configs. Any help would be really
appreciated. Just for reference, our Kafka brokers are on v0.10.0.1 and
RocksDB version is 4.8.0.




On Mon, Mar 20, 2017 at 12:29 PM, Mahendra Kariya <
mahendra.kariya@go-jek.com> wrote:

> Hey Guozhang,
>
> Thanks a lot for these insights. We are facing the exact same problem as
> Tianji. Our commit frequency is also quite high. We flush almost around 16K
> messages per minute to Kafka at the end of the topology.
>
> Another issue that we are facing is that rocksdb is not deleting old data.
> We have set the time window retention duration to 1 hour, but the disk size
> is constantly increasing. Ideally, the disk utilisation graph should
> plateau after some time.
>
> We would like to apply the config change suggestions that you have given.
> But we are on Kafka 0.10.0.1. And from the docs, it seems
> rocksdb.config.setter is not available for this version. Is there any
> other way for us to configure rocksdb?
>
> Does Kafka 0.10.0.1 emit any rocksdb related metrics that we can monitor
> and set up alerts on?
>
>
> Thanks!
>
>
>

Re: Kafka Streams: lockException

Posted by Mahendra Kariya <ma...@go-jek.com>.
Hey Guozhang,

Thanks a lot for these insights. We are facing the exact same problem as
Tianji. Our commit frequency is also quite high. We flush almost around 16K
messages per minute to Kafka at the end of the topology.

Another issue that we are facing is that rocksdb is not deleting old data.
We have set the time window retention duration to 1 hour, but the disk size
is constantly increasing. Ideally, the disk utilisation graph should
plateau after some time.

We would like to apply the config change suggestions that you have given.
But we are on Kafka 0.10.0.1. And from the docs, it seems
rocksdb.config.setter is not available for this version. Is there any other
way for us to configure rocksdb?

Does Kafka 0.10.0.1 emit any rocksdb related metrics that we can monitor
and set up alerts on?


Thanks!

Re: Kafka Streams: lockException

Posted by Guozhang Wang <wa...@gmail.com>.
Tianji and Sachin (and also cc'ing people who I remember have reported
similar RocksDB memory issues),

Sharing my experience with RocksDB tuning and also chatting with the
RocksDB community:

1. If you are frequently flushing the state stores (e.g. with high commit
frequency) then you will end up with huge number of very small memtable
files, and hence result in very high compaction pressure on RocksDB; if you
use default number of compaction threads (1) it will not be able to catch
up with the write throughput and compaction rate, and hence the gradual
degradation of performance. We have changed the default
num.compaction.threads in trunk but if you are under released version
0.10.2 or older, check your store's flush rate metrics and consider
increasing the compaction threads.

2. The most common memory leaks from RocksDB JNI are iterator leaks. Make
sure to close the iterator return for your range queries / fetches from the
stores when you are done. If not the corresponding scanned memory will be
pinned in memory and cannot be compacted.


Guozhang


On Fri, Mar 17, 2017 at 8:56 AM, Eno Thereska <en...@gmail.com>
wrote:

> Sachin, you also have a PR for this that could help, right?:
> https://github.com/apache/kafka/pull/2642#issuecomment-287372367 <
> https://github.com/apache/kafka/pull/2642#issuecomment-287372367>.
>
> Thanks
> Eno
>
>
> > On 17 Mar 2017, at 15:19, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > We also face same issues.
> > What we have found is that rocksdb is the issue. With many instances of
> > rocksdb per machine, over the time it slows down due to i/o operations,
> > resulting in threads getting evicted because max.poll.interval exceeds
> the
> > set limit.
> >
> > Try running rocksdb in memory https://github.com/facebook/
> > rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li <sk...@gmail.com> wrote:
> >
> >> Hi Eno,
> >>
> >> I used 150, 50, 20 threads and the probabilities of crashing decreased
> with
> >> this number. When using 1 thread, no crash!
> >>
> >> My max.poll.interval is 5 minutes and all the processing won't last that
> >> long, so that parameter does not help.
> >>
> >>
> >> Thanks
> >> Tianji
> >>
> >> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <en...@gmail.com>
> >> wrote:
> >>
> >>> Hi Tianji,
> >>>
> >>> How many threads does your app use?
> >>>
> >>> One reason is explained here: https://groups.google.com/
> >>> forum/#!topic/confluent-platform/wgCSuwIJo5g <
> https://groups.google.com/
> >>> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
> >> increase
> >>> max.poll.interval config value.
> >>> If that doesn't work, could you revert to using one thread for now.
> Also
> >>> let us know either way since we might need to open a bug report.
> >>>
> >>> Thanks
> >>> Eno
> >>>
> >>>> On 16 Mar 2017, at 20:47, Tianji Li <sk...@gmail.com> wrote:
> >>>>
> >>>> Hi there,
> >>>>
> >>>> I always got this crashes and wonder if anyone knows why. Please let
> me
> >>>> know what information I should provide to help with trouble shooting.
> >>>>
> >>>> I am using 0.10.2.0. My application is reading one topic and then
> >>>> groupBy().aggregate() 50 times on different keys.
> >>>>
> >>>> I use memory store, without backing to kafka.
> >>>>
> >>>> Thanks
> >>>> Tianji
> >>>>
> >>>>
> >>>> 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> >>>> o.a.k.s.p.internals.StreamThread         : Could not create task 0_4.
> >>> Will
> >>>> retry.
> >>>>
> >>>> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
> >> lock
> >>>> the state directory: /tmp/kafka-streams/xxx-test28/0_4
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.
> >>> ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> >>> AbstractTask.java:73)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.
> >>> StreamTask.<init>(StreamTask.java:108)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.
> >>> StreamThread.createStreamTask(StreamThread.java:834)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread$Ta
> skCreator.
> >>> createTask(StreamThread.java:1207)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread$
> >>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.
> >>> StreamThread.addStreamTasks(StreamThread.java:937)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> >>> StreamThread.java:69)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread$1.
> >>> onPartitionsAssigned(StreamThread.java:236)
> >>>>       at
> >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >>> onJoinComplete(ConsumerCoordinator.java:255)
> >>>>       at
> >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >>> joinGroupIfNeeded(AbstractCoordinator.java:339)
> >>>>       at
> >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >>> ensureActiveGroup(AbstractCoordinator.java:303)
> >>>>       at
> >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> >>> ConsumerCoordinator.java:286)
> >>>>       at
> >>>> org.apache.kafka.clients.consumer.KafkaConsumer.
> >>> pollOnce(KafkaConsumer.java:1030)
> >>>>       at
> >>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >>> KafkaConsumer.java:995)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>> StreamThread.java:582)
> >>>>       at
> >>>> org.apache.kafka.streams.processor.internals.
> >>> StreamThread.run(StreamThread.java:368)
> >>>
> >>>
> >>
>
>


-- 
-- Guozhang

Re: Kafka Streams: lockException

Posted by Eno Thereska <en...@gmail.com>.
Sachin, you also have a PR for this that could help, right?: https://github.com/apache/kafka/pull/2642#issuecomment-287372367 <https://github.com/apache/kafka/pull/2642#issuecomment-287372367>. 

Thanks
Eno


> On 17 Mar 2017, at 15:19, Sachin Mittal <sj...@gmail.com> wrote:
> 
> We also face same issues.
> What we have found is that rocksdb is the issue. With many instances of
> rocksdb per machine, over the time it slows down due to i/o operations,
> resulting in threads getting evicted because max.poll.interval exceeds the
> set limit.
> 
> Try running rocksdb in memory https://github.com/facebook/
> rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.
> 
> Thanks
> Sachin
> 
> 
> 
> On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li <sk...@gmail.com> wrote:
> 
>> Hi Eno,
>> 
>> I used 150, 50, 20 threads and the probabilities of crashing decreased with
>> this number. When using 1 thread, no crash!
>> 
>> My max.poll.interval is 5 minutes and all the processing won't last that
>> long, so that parameter does not help.
>> 
>> 
>> Thanks
>> Tianji
>> 
>> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <en...@gmail.com>
>> wrote:
>> 
>>> Hi Tianji,
>>> 
>>> How many threads does your app use?
>>> 
>>> One reason is explained here: https://groups.google.com/
>>> forum/#!topic/confluent-platform/wgCSuwIJo5g <https://groups.google.com/
>>> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
>> increase
>>> max.poll.interval config value.
>>> If that doesn't work, could you revert to using one thread for now. Also
>>> let us know either way since we might need to open a bug report.
>>> 
>>> Thanks
>>> Eno
>>> 
>>>> On 16 Mar 2017, at 20:47, Tianji Li <sk...@gmail.com> wrote:
>>>> 
>>>> Hi there,
>>>> 
>>>> I always got this crashes and wonder if anyone knows why. Please let me
>>>> know what information I should provide to help with trouble shooting.
>>>> 
>>>> I am using 0.10.2.0. My application is reading one topic and then
>>>> groupBy().aggregate() 50 times on different keys.
>>>> 
>>>> I use memory store, without backing to kafka.
>>>> 
>>>> Thanks
>>>> Tianji
>>>> 
>>>> 
>>>> 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
>>>> o.a.k.s.p.internals.StreamThread         : Could not create task 0_4.
>>> Will
>>>> retry.
>>>> 
>>>> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
>> lock
>>>> the state directory: /tmp/kafka-streams/xxx-test28/0_4
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.
>>> ProcessorStateManager.<init>(ProcessorStateManager.java:102)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
>>> AbstractTask.java:73)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.
>>> StreamTask.<init>(StreamTask.java:108)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.
>>> StreamThread.createStreamTask(StreamThread.java:834)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
>>> createTask(StreamThread.java:1207)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.StreamThread$
>>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.
>>> StreamThread.addStreamTasks(StreamThread.java:937)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.StreamThread.access$500(
>>> StreamThread.java:69)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.StreamThread$1.
>>> onPartitionsAssigned(StreamThread.java:236)
>>>>       at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>>> onJoinComplete(ConsumerCoordinator.java:255)
>>>>       at
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>> joinGroupIfNeeded(AbstractCoordinator.java:339)
>>>>       at
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>> ensureActiveGroup(AbstractCoordinator.java:303)
>>>>       at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>>> ConsumerCoordinator.java:286)
>>>>       at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>>> pollOnce(KafkaConsumer.java:1030)
>>>>       at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>> KafkaConsumer.java:995)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>> StreamThread.java:582)
>>>>       at
>>>> org.apache.kafka.streams.processor.internals.
>>> StreamThread.run(StreamThread.java:368)
>>> 
>>> 
>> 


Re: Kafka Streams: lockException

Posted by Sachin Mittal <sj...@gmail.com>.
We also face same issues.
What we have found is that rocksdb is the issue. With many instances of
rocksdb per machine, over the time it slows down due to i/o operations,
resulting in threads getting evicted because max.poll.interval exceeds the
set limit.

Try running rocksdb in memory https://github.com/facebook/
rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.

Thanks
Sachin



On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li <sk...@gmail.com> wrote:

> Hi Eno,
>
> I used 150, 50, 20 threads and the probabilities of crashing decreased with
> this number. When using 1 thread, no crash!
>
> My max.poll.interval is 5 minutes and all the processing won't last that
> long, so that parameter does not help.
>
>
> Thanks
> Tianji
>
> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Hi Tianji,
> >
> > How many threads does your app use?
> >
> > One reason is explained here: https://groups.google.com/
> > forum/#!topic/confluent-platform/wgCSuwIJo5g <https://groups.google.com/
> > forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
> increase
> > max.poll.interval config value.
> > If that doesn't work, could you revert to using one thread for now. Also
> > let us know either way since we might need to open a bug report.
> >
> > Thanks
> > Eno
> >
> > > On 16 Mar 2017, at 20:47, Tianji Li <sk...@gmail.com> wrote:
> > >
> > > Hi there,
> > >
> > > I always got this crashes and wonder if anyone knows why. Please let me
> > > know what information I should provide to help with trouble shooting.
> > >
> > > I am using 0.10.2.0. My application is reading one topic and then
> > > groupBy().aggregate() 50 times on different keys.
> > >
> > > I use memory store, without backing to kafka.
> > >
> > > Thanks
> > > Tianji
> > >
> > >
> > > 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> > > o.a.k.s.p.internals.StreamThread         : Could not create task 0_4.
> > Will
> > > retry.
> > >
> > > org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
> lock
> > > the state directory: /tmp/kafka-streams/xxx-test28/0_4
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> > >        at
> > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> > AbstractTask.java:73)
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.<init>(StreamTask.java:108)
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.createStreamTask(StreamThread.java:834)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> > createTask(StreamThread.java:1207)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread$
> > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.addStreamTasks(StreamThread.java:937)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> > StreamThread.java:69)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > onPartitionsAssigned(StreamThread.java:236)
> > >        at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > onJoinComplete(ConsumerCoordinator.java:255)
> > >        at
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > joinGroupIfNeeded(AbstractCoordinator.java:339)
> > >        at
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > ensureActiveGroup(AbstractCoordinator.java:303)
> > >        at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > ConsumerCoordinator.java:286)
> > >        at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1030)
> > >        at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:995)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:582)
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> >
> >
>

Re: Kafka Streams: lockException

Posted by Tianji Li <sk...@gmail.com>.
Hi Eno,

I used 150, 50, 20 threads and the probabilities of crashing decreased with
this number. When using 1 thread, no crash!

My max.poll.interval is 5 minutes and all the processing won't last that
long, so that parameter does not help.


Thanks
Tianji

On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Tianji,
>
> How many threads does your app use?
>
> One reason is explained here: https://groups.google.com/
> forum/#!topic/confluent-platform/wgCSuwIJo5g <https://groups.google.com/
> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to increase
> max.poll.interval config value.
> If that doesn't work, could you revert to using one thread for now. Also
> let us know either way since we might need to open a bug report.
>
> Thanks
> Eno
>
> > On 16 Mar 2017, at 20:47, Tianji Li <sk...@gmail.com> wrote:
> >
> > Hi there,
> >
> > I always got this crashes and wonder if anyone knows why. Please let me
> > know what information I should provide to help with trouble shooting.
> >
> > I am using 0.10.2.0. My application is reading one topic and then
> > groupBy().aggregate() 50 times on different keys.
> >
> > I use memory store, without backing to kafka.
> >
> > Thanks
> > Tianji
> >
> >
> > 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> > o.a.k.s.p.internals.StreamThread         : Could not create task 0_4.
> Will
> > retry.
> >
> > org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock
> > the state directory: /tmp/kafka-streams/xxx-test28/0_4
> >        at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> >        at
> > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> AbstractTask.java:73)
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:108)
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:834)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> createTask(StreamThread.java:1207)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:937)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> StreamThread.java:69)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:236)
> >        at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:255)
> >        at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:339)
> >        at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> >        at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:286)
> >        at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1030)
> >        at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:582)
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
>
>

Re: Kafka Streams: lockException

Posted by Eno Thereska <en...@gmail.com>.
Hi Tianji,

How many threads does your app use? 

One reason is explained here: https://groups.google.com/forum/#!topic/confluent-platform/wgCSuwIJo5g <https://groups.google.com/forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to increase max.poll.interval config value.
If that doesn't work, could you revert to using one thread for now. Also let us know either way since we might need to open a bug report.

Thanks
Eno

> On 16 Mar 2017, at 20:47, Tianji Li <sk...@gmail.com> wrote:
> 
> Hi there,
> 
> I always got this crashes and wonder if anyone knows why. Please let me
> know what information I should provide to help with trouble shooting.
> 
> I am using 0.10.2.0. My application is reading one topic and then
> groupBy().aggregate() 50 times on different keys.
> 
> I use memory store, without backing to kafka.
> 
> Thanks
> Tianji
> 
> 
> 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> o.a.k.s.p.internals.StreamThread         : Could not create task 0_4. Will
> retry.
> 
> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock
> the state directory: /tmp/kafka-streams/xxx-test28/0_4
>        at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
>        at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>        at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>        at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>        at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>        at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>        at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>        at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>        at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)