You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by d2r <gi...@git.apache.org> on 2014/06/19 23:27:21 UTC

[GitHub] incubator-storm pull request: STORM-350: LMAX Disruptor 3.2.1

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/134#discussion_r13994933
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java ---
    @@ -41,46 +37,47 @@
      * the ability to catch up to the producer by processing tuples in batches.
      */
     public class DisruptorQueue implements IStatefulObject {
    -    static final Object FLUSH_CACHE = new Object();
    -    static final Object INTERRUPT = new Object();
    -    
    -    RingBuffer<MutableObject> _buffer;
    -    Sequence _consumer;
    -    SequenceBarrier _barrier;
    -    
    +    private static final Object FLUSH_CACHE = new Object();
    +    private static final Object INTERRUPT = new Object();
    +    private static final String PREFIX = "disruptor-";
    +
    +    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
    +    private final HashMap<String, Object> state = new HashMap<String, Object>(4);
    +
    +    private final String _queueName;
    +    private final RingBuffer<MutableObject> _buffer;
    +    private final Sequence _consumer;
    +    private final SequenceBarrier _barrier;
    +
         // TODO: consider having a threadlocal cache of this variable to speed up reads?
         volatile boolean consumerStartedFlag = false;
    -    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
    -    private static String PREFIX = "disruptor-";
    -    private String _queueName = "";
    -    
    -    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
    -         this._queueName = PREFIX + queueName;
    -        _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
    +
    +    public DisruptorQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
    +        _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
    +        _queueName = PREFIX + queueName;
             _consumer = new Sequence();
             _barrier = _buffer.newBarrier();
    -        _buffer.setGatingSequences(_consumer);
    -        if(claim instanceof SingleThreadedClaimStrategy) {
    -            consumerStartedFlag = true;
    -        }
    +        _buffer.addGatingSequences(_consumer);
    +        consumerStartedFlag = producerType == ProducerType.SINGLE;
         }
    -    
    +
         public String getName() {
    -      return _queueName;
    +        return _queueName;
         }
    -    
    +
    +
         public void consumeBatch(EventHandler<Object> handler) {
             consumeBatchToCursor(_barrier.getCursor(), handler);
         }
    -    
    +
         public void haltWithInterrupt() {
             publish(INTERRUPT);
         }
    -    
    +
         public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
             try {
                 final long nextSequence = _consumer.get() + 1;
    -            final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
    +            final long availableSequence = _barrier.waitFor(nextSequence);
    --- End diff --
    
    Prior to this change we defaulted to the BlockingWaitStrategy.  Now that timeouts are a part of the strategy, should we make a change to default to a TimeoutBlockingWaitStrategy with a timeout of 10ms?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---