You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2017/04/09 11:45:58 UTC

In kafka streams consumer seems to hang while retrieving the offsets

Hi,
In my streams applications cluster in one or more instances I see some
threads always waiting with the following stack.

Every time I check on jstack I see the following trace.

Is this some kind of new deadlock that we have failed to identify.

Thanks
Sachin

here is the stack trace:
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
"StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf
runnable [0x00007fb7cb4f6000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3)
        - locked <0x0000000701c50c88> (a java.util.Collections$
UnmodifiableSet)
        - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.kafka.common.network.Selector.select(
Selector.java:489)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
        at org.apache.kafka.clients.NetworkClient.poll(
NetworkClient.java:349)
        at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
        - locked <0x0000000701c5da48> (a org.apache.kafka.clients.
consumer.internals.ConsumerNetworkClient)
        at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
        at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)
        at org.apache.kafka.clients.consumer.internals.Fetcher.
retrieveOffsetsByTimes(Fetcher.java:422)
        at org.apache.kafka.clients.consumer.internals.Fetcher.
resetOffset(Fetcher.java:370)
        at org.apache.kafka.clients.consumer.internals.Fetcher.
resetOffsetsIfNeeded(Fetcher.java:227)
        at org.apache.kafka.clients.consumer.KafkaConsumer.
updateFetchPositions(KafkaConsumer.java:1592)
        at org.apache.kafka.clients.consumer.KafkaConsumer.
position(KafkaConsumer.java:1265)
        at org.apache.kafka.streams.processor.internals.
ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)

Re: In kafka streams consumer seems to hang while retrieving the offsets

Posted by Sachin Mittal <sj...@gmail.com>.
I have done these changes and also set
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000
Now producer sometimes fails after 3 minutes earlier it used to fail at 30
seconds (default value).

So I was wondering what would be the reason of the same and how high should
this value go.

Thanks
Sachin


On Mon, Apr 10, 2017 at 5:00 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Sachin,
>
> In 0.10.2.1 we've changed the default value of max.poll.interval.ms (to
> avoid rebalancing during recovery) as well as the default value of the
> streams producer retries (to retry during a temporary broker failure). I
> think you are aware of the changes, but just double checking. You don't
> need to wait for 0.10.2.1, you can make the changes directly yourself:
>
> final Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, ID);
> ...
> props.put(ProducerConfig.RETRIES_CONFIG, 10);
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> Integer.toString(Integer.MAX_VALUE));
>
> This doesn't address the RocksDB issue though, still looking into that.
>
> Thanks
> Eno
>
> > On 9 Apr 2017, at 22:55, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Let me try to get the debug log when this error happens.
> >
> > Right now we have three instances each with 4 threads consuming from 12
> > partition topic.
> > So one thread per partition.
> >
> > The application is running fine much better than before. Now it usually
> > runs for a week even during peak load.
> >
> > Sometime out of blue either rocksdb throws an exception with a single
> > character (which I guess is a known issue with rocks db fixed in some
> next
> > release).
> > Or the producer gets timed out while committing some changelog topic
> > record. I had increased the timeout from 30 seconds to 180 seconds, but
> it
> > still throws exception for that time also.
> >
> > Not sure if these are due to VM issue or network.
> >
> > But whenever something like this happens, the application goes into
> > rebalance and soon things take turn for worse. Soon some of the threads
> go
> > into deadlock with above stack trace and application is now in perpetual
> > rebalance state.
> >
> > Only way to resolve this is kill all instances using -9 and restart the
> > instances one by one.
> >
> > So also long as we have a steady state of one thread per partition
> > everything is working fine. I am still working out a way to limit the
> > changelog topic size by more aggressive compaction and let me see if that
> > will make things better.
> >
> > I will try to get the logs when this happens next time.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska <en...@gmail.com>
> wrote:
> >
> >> Hi Sachin,
> >>
> >> It's not necessarily a deadlock. Do you have any debug traces from those
> >> nodes? Also would be useful to know the config (e.g., how many
> partitions
> >> do you have and how many app instances.)
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 9 Apr 2017, at 04:45, Sachin Mittal <sj...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>> In my streams applications cluster in one or more instances I see some
> >>> threads always waiting with the following stack.
> >>>
> >>> Every time I check on jstack I see the following trace.
> >>>
> >>> Is this some kind of new deadlock that we have failed to identify.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>> here is the stack trace:
> >>> ------------------------------------------------------------
> >> ------------------------------------------------------------
> >> ----------------------------------------------
> >>> "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf
> >>> runnable [0x00007fb7cb4f6000]
> >>>  java.lang.Thread.State: RUNNABLE
> >>>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >>>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> >>>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
> >> java:93)
> >>>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> >>>       - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3)
> >>>       - locked <0x0000000701c50c88> (a java.util.Collections$
> >>> UnmodifiableSet)
> >>>       - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
> >>>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> >>>       at org.apache.kafka.common.network.Selector.select(
> >>> Selector.java:489)
> >>>       at org.apache.kafka.common.network.Selector.poll(
> >> Selector.java:298)
> >>>       at org.apache.kafka.clients.NetworkClient.poll(
> >>> NetworkClient.java:349)
> >>>       at org.apache.kafka.clients.consumer.internals.
> >>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> >>>       - locked <0x0000000701c5da48> (a org.apache.kafka.clients.
> >>> consumer.internals.ConsumerNetworkClient)
> >>>       at org.apache.kafka.clients.consumer.internals.
> >>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
> >>>       at org.apache.kafka.clients.consumer.internals.
> >>> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:
> >> 138)
> >>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> retrieveOffsetsByTimes(Fetcher.java:422)
> >>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> resetOffset(Fetcher.java:370)
> >>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> resetOffsetsIfNeeded(Fetcher.java:227)
> >>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>> updateFetchPositions(KafkaConsumer.java:1592)
> >>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>> position(KafkaConsumer.java:1265)
> >>>       at org.apache.kafka.streams.processor.internals.
> >>> ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:
> 213)
> >>
> >>
>
>

Re: In kafka streams consumer seems to hang while retrieving the offsets

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

In 0.10.2.1 we've changed the default value of max.poll.interval.ms (to avoid rebalancing during recovery) as well as the default value of the streams producer retries (to retry during a temporary broker failure). I think you are aware of the changes, but just double checking. You don't need to wait for 0.10.2.1, you can make the changes directly yourself:

final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, ID);
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));

This doesn't address the RocksDB issue though, still looking into that.

Thanks
Eno

> On 9 Apr 2017, at 22:55, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Let me try to get the debug log when this error happens.
> 
> Right now we have three instances each with 4 threads consuming from 12
> partition topic.
> So one thread per partition.
> 
> The application is running fine much better than before. Now it usually
> runs for a week even during peak load.
> 
> Sometime out of blue either rocksdb throws an exception with a single
> character (which I guess is a known issue with rocks db fixed in some next
> release).
> Or the producer gets timed out while committing some changelog topic
> record. I had increased the timeout from 30 seconds to 180 seconds, but it
> still throws exception for that time also.
> 
> Not sure if these are due to VM issue or network.
> 
> But whenever something like this happens, the application goes into
> rebalance and soon things take turn for worse. Soon some of the threads go
> into deadlock with above stack trace and application is now in perpetual
> rebalance state.
> 
> Only way to resolve this is kill all instances using -9 and restart the
> instances one by one.
> 
> So also long as we have a steady state of one thread per partition
> everything is working fine. I am still working out a way to limit the
> changelog topic size by more aggressive compaction and let me see if that
> will make things better.
> 
> I will try to get the logs when this happens next time.
> 
> Thanks
> Sachin
> 
> 
> 
> On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska <en...@gmail.com> wrote:
> 
>> Hi Sachin,
>> 
>> It's not necessarily a deadlock. Do you have any debug traces from those
>> nodes? Also would be useful to know the config (e.g., how many partitions
>> do you have and how many app instances.)
>> 
>> Thanks
>> Eno
>> 
>>> On 9 Apr 2017, at 04:45, Sachin Mittal <sj...@gmail.com> wrote:
>>> 
>>> Hi,
>>> In my streams applications cluster in one or more instances I see some
>>> threads always waiting with the following stack.
>>> 
>>> Every time I check on jstack I see the following trace.
>>> 
>>> Is this some kind of new deadlock that we have failed to identify.
>>> 
>>> Thanks
>>> Sachin
>>> 
>>> here is the stack trace:
>>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ----------------------------------------------
>>> "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf
>>> runnable [0x00007fb7cb4f6000]
>>>  java.lang.Thread.State: RUNNABLE
>>>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
>> java:93)
>>>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>>>       - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3)
>>>       - locked <0x0000000701c50c88> (a java.util.Collections$
>>> UnmodifiableSet)
>>>       - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
>>>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>       at org.apache.kafka.common.network.Selector.select(
>>> Selector.java:489)
>>>       at org.apache.kafka.common.network.Selector.poll(
>> Selector.java:298)
>>>       at org.apache.kafka.clients.NetworkClient.poll(
>>> NetworkClient.java:349)
>>>       at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>>>       - locked <0x0000000701c5da48> (a org.apache.kafka.clients.
>>> consumer.internals.ConsumerNetworkClient)
>>>       at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
>>>       at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:
>> 138)
>>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> retrieveOffsetsByTimes(Fetcher.java:422)
>>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> resetOffset(Fetcher.java:370)
>>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> resetOffsetsIfNeeded(Fetcher.java:227)
>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> updateFetchPositions(KafkaConsumer.java:1592)
>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> position(KafkaConsumer.java:1265)
>>>       at org.apache.kafka.streams.processor.internals.
>>> ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)
>> 
>> 


Re: In kafka streams consumer seems to hang while retrieving the offsets

Posted by Sachin Mittal <sj...@gmail.com>.
Let me try to get the debug log when this error happens.

Right now we have three instances each with 4 threads consuming from 12
partition topic.
So one thread per partition.

The application is running fine much better than before. Now it usually
runs for a week even during peak load.

Sometime out of blue either rocksdb throws an exception with a single
character (which I guess is a known issue with rocks db fixed in some next
release).
Or the producer gets timed out while committing some changelog topic
record. I had increased the timeout from 30 seconds to 180 seconds, but it
still throws exception for that time also.

Not sure if these are due to VM issue or network.

But whenever something like this happens, the application goes into
rebalance and soon things take turn for worse. Soon some of the threads go
into deadlock with above stack trace and application is now in perpetual
rebalance state.

Only way to resolve this is kill all instances using -9 and restart the
instances one by one.

So also long as we have a steady state of one thread per partition
everything is working fine. I am still working out a way to limit the
changelog topic size by more aggressive compaction and let me see if that
will make things better.

I will try to get the logs when this happens next time.

Thanks
Sachin



On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska <en...@gmail.com> wrote:

> Hi Sachin,
>
> It's not necessarily a deadlock. Do you have any debug traces from those
> nodes? Also would be useful to know the config (e.g., how many partitions
> do you have and how many app instances.)
>
> Thanks
> Eno
>
> > On 9 Apr 2017, at 04:45, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Hi,
> > In my streams applications cluster in one or more instances I see some
> > threads always waiting with the following stack.
> >
> > Every time I check on jstack I see the following trace.
> >
> > Is this some kind of new deadlock that we have failed to identify.
> >
> > Thanks
> > Sachin
> >
> > here is the stack trace:
> > ------------------------------------------------------------
> ------------------------------------------------------------
> ----------------------------------------------
> > "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf
> > runnable [0x00007fb7cb4f6000]
> >   java.lang.Thread.State: RUNNABLE
> >        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> >        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
> java:93)
> >        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> >        - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3)
> >        - locked <0x0000000701c50c88> (a java.util.Collections$
> > UnmodifiableSet)
> >        - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
> >        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> >        at org.apache.kafka.common.network.Selector.select(
> > Selector.java:489)
> >        at org.apache.kafka.common.network.Selector.poll(
> Selector.java:298)
> >        at org.apache.kafka.clients.NetworkClient.poll(
> > NetworkClient.java:349)
> >        at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> >        - locked <0x0000000701c5da48> (a org.apache.kafka.clients.
> > consumer.internals.ConsumerNetworkClient)
> >        at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
> >        at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:
> 138)
> >        at org.apache.kafka.clients.consumer.internals.Fetcher.
> > retrieveOffsetsByTimes(Fetcher.java:422)
> >        at org.apache.kafka.clients.consumer.internals.Fetcher.
> > resetOffset(Fetcher.java:370)
> >        at org.apache.kafka.clients.consumer.internals.Fetcher.
> > resetOffsetsIfNeeded(Fetcher.java:227)
> >        at org.apache.kafka.clients.consumer.KafkaConsumer.
> > updateFetchPositions(KafkaConsumer.java:1592)
> >        at org.apache.kafka.clients.consumer.KafkaConsumer.
> > position(KafkaConsumer.java:1265)
> >        at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)
>
>

Re: In kafka streams consumer seems to hang while retrieving the offsets

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

It's not necessarily a deadlock. Do you have any debug traces from those nodes? Also would be useful to know the config (e.g., how many partitions do you have and how many app instances.)

Thanks
Eno

> On 9 Apr 2017, at 04:45, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Hi,
> In my streams applications cluster in one or more instances I see some
> threads always waiting with the following stack.
> 
> Every time I check on jstack I see the following trace.
> 
> Is this some kind of new deadlock that we have failed to identify.
> 
> Thanks
> Sachin
> 
> here is the stack trace:
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------
> "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf
> runnable [0x00007fb7cb4f6000]
>   java.lang.Thread.State: RUNNABLE
>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>        - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3)
>        - locked <0x0000000701c50c88> (a java.util.Collections$
> UnmodifiableSet)
>        - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>        at org.apache.kafka.common.network.Selector.select(
> Selector.java:489)
>        at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
>        at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:349)
>        at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>        - locked <0x0000000701c5da48> (a org.apache.kafka.clients.
> consumer.internals.ConsumerNetworkClient)
>        at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
>        at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)
>        at org.apache.kafka.clients.consumer.internals.Fetcher.
> retrieveOffsetsByTimes(Fetcher.java:422)
>        at org.apache.kafka.clients.consumer.internals.Fetcher.
> resetOffset(Fetcher.java:370)
>        at org.apache.kafka.clients.consumer.internals.Fetcher.
> resetOffsetsIfNeeded(Fetcher.java:227)
>        at org.apache.kafka.clients.consumer.KafkaConsumer.
> updateFetchPositions(KafkaConsumer.java:1592)
>        at org.apache.kafka.clients.consumer.KafkaConsumer.
> position(KafkaConsumer.java:1265)
>        at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)