You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2014/06/19 23:27:25 UTC

[jira] [Commented] (STORM-350) Update disruptor to latest version (3.2.1)

    [ https://issues.apache.org/jira/browse/STORM-350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14037916#comment-14037916 ] 

ASF GitHub Bot commented on STORM-350:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/134#discussion_r13994933
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java ---
    @@ -41,46 +37,47 @@
      * the ability to catch up to the producer by processing tuples in batches.
      */
     public class DisruptorQueue implements IStatefulObject {
    -    static final Object FLUSH_CACHE = new Object();
    -    static final Object INTERRUPT = new Object();
    -    
    -    RingBuffer<MutableObject> _buffer;
    -    Sequence _consumer;
    -    SequenceBarrier _barrier;
    -    
    +    private static final Object FLUSH_CACHE = new Object();
    +    private static final Object INTERRUPT = new Object();
    +    private static final String PREFIX = "disruptor-";
    +
    +    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
    +    private final HashMap<String, Object> state = new HashMap<String, Object>(4);
    +
    +    private final String _queueName;
    +    private final RingBuffer<MutableObject> _buffer;
    +    private final Sequence _consumer;
    +    private final SequenceBarrier _barrier;
    +
         // TODO: consider having a threadlocal cache of this variable to speed up reads?
         volatile boolean consumerStartedFlag = false;
    -    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
    -    private static String PREFIX = "disruptor-";
    -    private String _queueName = "";
    -    
    -    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
    -         this._queueName = PREFIX + queueName;
    -        _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
    +
    +    public DisruptorQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
    +        _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
    +        _queueName = PREFIX + queueName;
             _consumer = new Sequence();
             _barrier = _buffer.newBarrier();
    -        _buffer.setGatingSequences(_consumer);
    -        if(claim instanceof SingleThreadedClaimStrategy) {
    -            consumerStartedFlag = true;
    -        }
    +        _buffer.addGatingSequences(_consumer);
    +        consumerStartedFlag = producerType == ProducerType.SINGLE;
         }
    -    
    +
         public String getName() {
    -      return _queueName;
    +        return _queueName;
         }
    -    
    +
    +
         public void consumeBatch(EventHandler<Object> handler) {
             consumeBatchToCursor(_barrier.getCursor(), handler);
         }
    -    
    +
         public void haltWithInterrupt() {
             publish(INTERRUPT);
         }
    -    
    +
         public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
             try {
                 final long nextSequence = _consumer.get() + 1;
    -            final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
    +            final long availableSequence = _barrier.waitFor(nextSequence);
    --- End diff --
    
    Prior to this change we defaulted to the BlockingWaitStrategy.  Now that timeouts are a part of the strategy, should we make a change to default to a TimeoutBlockingWaitStrategy with a timeout of 10ms?


> Update disruptor to latest version (3.2.1)
> ------------------------------------------
>
>                 Key: STORM-350
>                 URL: https://issues.apache.org/jira/browse/STORM-350
>             Project: Apache Storm (Incubating)
>          Issue Type: Dependency upgrade
>            Reporter: Boris Aksenov
>




--
This message was sent by Atlassian JIRA
(v6.2#6252)