You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Sean Zhong (JIRA)" <ji...@apache.org> on 2014/06/10 01:59:02 UTC

[jira] [Updated] (STORM-342) Contention in Disruptor Queue which may cause message loss or out of order

     [ https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Zhong updated STORM-342:
-----------------------------

    Summary: Contention in Disruptor Queue which may cause message loss or out of order  (was: Contention in Disruptor Queue which may cause message out of order)

> Contention in Disruptor Queue which may cause message loss or out of order
> --------------------------------------------------------------------------
>
>                 Key: STORM-342
>                 URL: https://issues.apache.org/jira/browse/STORM-342
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>            Priority: Blocker
>
> Disruptor contains a potential contention bug between consumer and producer. It can cause consume queue hang, message loss, or message disorder.
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
>     public void publish(Object obj, boolean block) throws InsufficientCapacityException {
>         if(consumerStartedFlag) {
>             final long id;
>             if(block) {
>                 id = _buffer.next();
>             } else {
>                 id = _buffer.tryNext(1);
>             }
>             final MutableObject m = _buffer.get(id);
>             m.setObject(obj);
>             _buffer.publish(id);
>         } else {
>             _cache.add(obj);
>             if(consumerStartedFlag) flushCache();
>         }
>     }
>     
>     public void consumerStarted() {
>         if(!consumerStartedFlag) {
>             consumerStartedFlag = true;
>             flushCache();
>         }
>     }
> }
> {code}
> The following steps will describe the scenario which make message disorder:
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag" == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet.
> 4. producer in another thread publish message "2", as "consumerStartedFlag" == true now, it will publish directly in RingBuffer.
> 5. flushCache() is called in consumerStarted() 
> 6. FLUSH_CACHE object is published RingBuffer, it will mark the position of message "1" in RingBuffer.
> 7. consume() is called, it will first fecth "2", then "1"
> 8. message order is wrong!
> The following steps describe the scenario which make message loss, and consumer thread hang forever.
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag" == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet.
> 4. producer in another thread publish multiple messages "2", as "consumerStartedFlag" == true now, it will publish directly in RingBuffer. And then the RingBuffer is full.
> 5. flushCache() is called in consumerStarted() 
> 6. FLUSH_CACHE object is published RingBuffer, As now RingBuffer is full, the consumer thread will be blocked.
> 7. consume() is never called, so the consumer thread is always blocked.
> I found this after troubleshooting a tricky random failure(1 in 100 times). It usually happen when producer and consumer colocated in same process, for example,  the task send queue thread as producer, produce message to local task receive queue in same worker.



--
This message was sent by Atlassian JIRA
(v6.2#6252)