You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Steven Schlansker <ss...@opentable.com> on 2018/05/10 17:48:04 UTC

Consulting ReadOnlyKeyValueStore from Processor can lead to deadlock

Hello again fellow Kafkans,

Yesterday we observed a production deadlock take down one of our instances.
Upon digging, it's clear that our usage of Kafka is the proximate cause, but the
danger of our approach is not clear at all just from the Javadocs.

We have stream processors that read off an incoming KStream,
possibly cross-reference some data from an auxiliary table
via ReadOnlyKeyValueStore.get()

This is done via custom logic rather than a direct KTable join because which
index is consulted may change depending on the shape of incoming data.

The ROKVS docs state,

 * A key value store that only supports read operations.
 * Implementations should be thread-safe as concurrent reads and writes
 * are expected.

They do *not* indicate that the CachingKVS layer uses a ReadWriteLock.  If
you have one CachingKVS flush a record cause a read from another CKVS,
you are suddenly vulnerable to classic lock order reversals!  Surprise!

A partial stack trace highlighting the problem, with many uninteresting frames removed,
is inline at the bottom of this mail.

You could probably rightly point to us allowing a "observer" pattern to callback
from within streams processing code is dangerous.  We might move this off to
an auxiliary thread to alleviate this problem.

But it still remains -- when you go an read that ROKVS documentation, it sure
doesn't prepare you to this possibility!  And, it's a little frustrating that
we have to have this 'caching' layer at all -- we already had to add

        // ensure KTable doesn't delay updates due to buffering in cache
        kafkaStreamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

because we are quite latency sensitive.  But there's seemingly no way to remove the
undesired cache and lock entirely, as it's hard coded in a number of places.

Should I file a ticket for this?  What do you see as the best way for the library to improve
so that future developers don't hit this as badly as I did?

Thanks for any thoughts,
Steven


"chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-1" - Thread t@79
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - waiting to lock <4db7ff53> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) owned by "chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-4" t@85
        at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:157)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
        at com.opentable.chat.service.LocalChatStorage.findById(LocalChatStorage.java:361)
        at com.opentable.chat.service.LocalChatStorage.request(LocalChatStorage.java:395)
        at com.opentable.chat.action.WelcomeAction.onEndpointUpdate(WelcomeAction.java:71)
        at com.opentable.chat.service.LocalChatStorage.lambda$handleEndpointUpdate$31(LocalChatStorage.java:1022)
        at com.opentable.chat.service.LocalChatStorage$$Lambda$466/1976342649.test(Unknown Source)
        at com.opentable.chat.service.LocalChatStorage.dispatchObservers(LocalChatStorage.java:1170)
        at com.opentable.chat.service.LocalChatStorage.handleEndpointUpdate(LocalChatStorage.java:1022)
        at com.opentable.chat.service.ChatPipeline$$Lambda$364/1199894489.apply(Unknown Source)
        at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
        at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:233)
        - locked <a4beebf> (a org.apache.kafka.streams.state.internals.NamedCache)
        at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:243)
        at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:228)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:221)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)

   Locked ownable synchronizers:
        - locked <4397f604> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)

"chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-4" - Thread t@85
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - waiting to lock <4397f604> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) owned by "chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-1" t@79
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:157)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
        at com.opentable.chat.service.LocalChatStorage.getEndpoint(LocalChatStorage.java:880)
        at com.opentable.chat.service.LocalChatStorage.getEndpoint(LocalChatStorage.java:894)
        at com.opentable.chat.action.WelcomeAction.onMessageUpdate(WelcomeAction.java:101)
        at com.opentable.chat.service.LocalChatStorage.lambda$handleMessageUpdate$34(LocalChatStorage.java:1142)
        at com.opentable.chat.service.LocalChatStorage$$Lambda$463/1420253975.test(Unknown Source)
        at com.opentable.chat.service.LocalChatStorage.dispatchObservers(LocalChatStorage.java:1170)
        at com.opentable.chat.service.LocalChatStorage.handleMessageUpdate(LocalChatStorage.java:1142)
        at com.opentable.chat.service.ChatPipeline$$Lambda$366/1134976216.apply(Unknown Source)
        at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
        at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:233)
        - locked <2a59610> (a org.apache.kafka.streams.state.internals.NamedCache)
        at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:243)
        at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:228)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:221)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)

   Locked ownable synchronizers:
        - locked <4db7ff53> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)



Re: Consulting ReadOnlyKeyValueStore from Processor can lead to deadlock

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Created a Jira for each:

 - https://issues.apache.org/jira/browse/KAFKA-6998
 - https://issues.apache.org/jira/browse/KAFKA-6999


-Matthias

On 5/11/18 10:06 AM, Guozhang Wang wrote:
> Hello Steven, thanks for pointing it out. I think both of the mentioned
> issues worth be improving:
> 
> 1. The read-write lock documentation for caching enabled stores.
> 2. When CACHE_MAX_BYTES_BUFFERING_CONFIG is set to 0, we should
> automatically disable the dummy caching layer in all stores as it is not
> only non-necessary but also brings wasted CPUs (i.e. you are enforcing a
> flush on each put operation).
> 
> Do you mind file a JIRA for both of them? And I'd appreciate if you are
> willing to tackle on 1) above :)
> 
> Guozhang
> 
> 
> On Thu, May 10, 2018 at 7:21 PM, Ted Yu <yu...@gmail.com> wrote:
> 
>> bq. the docs and CachingKVS behavior could improve
>>
>> I would agree.
>>
>> Pointing out the usage of ReadWriteLock and mentioning the
>> withCachingDisabled()
>> method in doc would help other developers.
>>
>> On Thu, May 10, 2018 at 11:21 AM, Steven Schlansker <
>> sschlansker@opentable.com> wrote:
>>
>>>
>>>> On May 10, 2018, at 10:48 AM, Steven Schlansker <
>>> sschlansker@opentable.com> wrote:
>>>>
>>>> But it still remains -- when you go an read that ROKVS documentation,
>> it
>>> sure
>>>> doesn't prepare you to this possibility!  And, it's a little
>> frustrating
>>> that
>>>> we have to have this 'caching' layer at all -- we already had to add
>>>>
>>>>        // ensure KTable doesn't delay updates due to buffering in cache
>>>>        kafkaStreamProps.put(StreamsConfig.CACHE_MAX_BYTES_
>> BUFFERING_CONFIG,
>>> 0);
>>>
>>> Now that I've said this, it seems that since I last checked we got
>>> 'Materialized.withCachingDisabled'.
>>> I'll see if that does what I want...  (I still think the docs and
>>> CachingKVS behavior could improve, though.)
>>>
>>>
>>
> 
> 
> 


Re: Consulting ReadOnlyKeyValueStore from Processor can lead to deadlock

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Steven, thanks for pointing it out. I think both of the mentioned
issues worth be improving:

1. The read-write lock documentation for caching enabled stores.
2. When CACHE_MAX_BYTES_BUFFERING_CONFIG is set to 0, we should
automatically disable the dummy caching layer in all stores as it is not
only non-necessary but also brings wasted CPUs (i.e. you are enforcing a
flush on each put operation).

Do you mind file a JIRA for both of them? And I'd appreciate if you are
willing to tackle on 1) above :)

Guozhang


On Thu, May 10, 2018 at 7:21 PM, Ted Yu <yu...@gmail.com> wrote:

> bq. the docs and CachingKVS behavior could improve
>
> I would agree.
>
> Pointing out the usage of ReadWriteLock and mentioning the
> withCachingDisabled()
> method in doc would help other developers.
>
> On Thu, May 10, 2018 at 11:21 AM, Steven Schlansker <
> sschlansker@opentable.com> wrote:
>
> >
> > > On May 10, 2018, at 10:48 AM, Steven Schlansker <
> > sschlansker@opentable.com> wrote:
> > >
> > > But it still remains -- when you go an read that ROKVS documentation,
> it
> > sure
> > > doesn't prepare you to this possibility!  And, it's a little
> frustrating
> > that
> > > we have to have this 'caching' layer at all -- we already had to add
> > >
> > >        // ensure KTable doesn't delay updates due to buffering in cache
> > >        kafkaStreamProps.put(StreamsConfig.CACHE_MAX_BYTES_
> BUFFERING_CONFIG,
> > 0);
> >
> > Now that I've said this, it seems that since I last checked we got
> > 'Materialized.withCachingDisabled'.
> > I'll see if that does what I want...  (I still think the docs and
> > CachingKVS behavior could improve, though.)
> >
> >
>



-- 
-- Guozhang

Re: Consulting ReadOnlyKeyValueStore from Processor can lead to deadlock

Posted by Ted Yu <yu...@gmail.com>.
bq. the docs and CachingKVS behavior could improve

I would agree.

Pointing out the usage of ReadWriteLock and mentioning the
withCachingDisabled()
method in doc would help other developers.

On Thu, May 10, 2018 at 11:21 AM, Steven Schlansker <
sschlansker@opentable.com> wrote:

>
> > On May 10, 2018, at 10:48 AM, Steven Schlansker <
> sschlansker@opentable.com> wrote:
> >
> > But it still remains -- when you go an read that ROKVS documentation, it
> sure
> > doesn't prepare you to this possibility!  And, it's a little frustrating
> that
> > we have to have this 'caching' layer at all -- we already had to add
> >
> >        // ensure KTable doesn't delay updates due to buffering in cache
> >        kafkaStreamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0);
>
> Now that I've said this, it seems that since I last checked we got
> 'Materialized.withCachingDisabled'.
> I'll see if that does what I want...  (I still think the docs and
> CachingKVS behavior could improve, though.)
>
>

Re: Consulting ReadOnlyKeyValueStore from Processor can lead to deadlock

Posted by Steven Schlansker <ss...@opentable.com>.
> On May 10, 2018, at 10:48 AM, Steven Schlansker <ss...@opentable.com> wrote:
> 
> But it still remains -- when you go an read that ROKVS documentation, it sure
> doesn't prepare you to this possibility!  And, it's a little frustrating that
> we have to have this 'caching' layer at all -- we already had to add
> 
>        // ensure KTable doesn't delay updates due to buffering in cache
>        kafkaStreamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

Now that I've said this, it seems that since I last checked we got 'Materialized.withCachingDisabled'.
I'll see if that does what I want...  (I still think the docs and CachingKVS behavior could improve, though.)