You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by miofthena <gi...@git.apache.org> on 2014/06/08 00:57:08 UTC

[GitHub] incubator-storm pull request: LMAX Disruptor 3.2.1

GitHub user miofthena opened a pull request:

    https://github.com/apache/incubator-storm/pull/134

    LMAX Disruptor 3.2.1

    Use latest version of disruptor

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/miofthena/incubator-storm disruptor-update

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-storm/pull/134.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #134
    
----
commit db16c1dfd0cf41a6d6e7f105ef0b121f5bec0642
Author: Boris Aksenov <ak...@corp.finam.ru>
Date:   2014-06-07T22:55:03Z

    use latest version of disruptor 3.2.1

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-350: LMAX Disruptor 3.2.1

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/incubator-storm/pull/134#issuecomment-46619681
  
    This looks fine to me overall.  Just one question about the default WaitStrategy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-350: LMAX Disruptor 3.2.1

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/134#issuecomment-49432886
  
    The tests still all pass, and the code looks OK to me I am a +1, but I would like to understand the timeout question asked a little bit better. I'll try to check this in on Monday unless anyone else objects or has outstanding questions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-350: LMAX Disruptor 3.2.1

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-storm/pull/134


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-350: LMAX Disruptor 3.2.1

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/134#discussion_r13994933
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java ---
    @@ -41,46 +37,47 @@
      * the ability to catch up to the producer by processing tuples in batches.
      */
     public class DisruptorQueue implements IStatefulObject {
    -    static final Object FLUSH_CACHE = new Object();
    -    static final Object INTERRUPT = new Object();
    -    
    -    RingBuffer<MutableObject> _buffer;
    -    Sequence _consumer;
    -    SequenceBarrier _barrier;
    -    
    +    private static final Object FLUSH_CACHE = new Object();
    +    private static final Object INTERRUPT = new Object();
    +    private static final String PREFIX = "disruptor-";
    +
    +    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
    +    private final HashMap<String, Object> state = new HashMap<String, Object>(4);
    +
    +    private final String _queueName;
    +    private final RingBuffer<MutableObject> _buffer;
    +    private final Sequence _consumer;
    +    private final SequenceBarrier _barrier;
    +
         // TODO: consider having a threadlocal cache of this variable to speed up reads?
         volatile boolean consumerStartedFlag = false;
    -    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
    -    private static String PREFIX = "disruptor-";
    -    private String _queueName = "";
    -    
    -    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
    -         this._queueName = PREFIX + queueName;
    -        _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
    +
    +    public DisruptorQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
    +        _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
    +        _queueName = PREFIX + queueName;
             _consumer = new Sequence();
             _barrier = _buffer.newBarrier();
    -        _buffer.setGatingSequences(_consumer);
    -        if(claim instanceof SingleThreadedClaimStrategy) {
    -            consumerStartedFlag = true;
    -        }
    +        _buffer.addGatingSequences(_consumer);
    +        consumerStartedFlag = producerType == ProducerType.SINGLE;
         }
    -    
    +
         public String getName() {
    -      return _queueName;
    +        return _queueName;
         }
    -    
    +
    +
         public void consumeBatch(EventHandler<Object> handler) {
             consumeBatchToCursor(_barrier.getCursor(), handler);
         }
    -    
    +
         public void haltWithInterrupt() {
             publish(INTERRUPT);
         }
    -    
    +
         public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
             try {
                 final long nextSequence = _consumer.get() + 1;
    -            final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
    +            final long availableSequence = _barrier.waitFor(nextSequence);
    --- End diff --
    
    Prior to this change we defaulted to the BlockingWaitStrategy.  Now that timeouts are a part of the strategy, should we make a change to default to a TimeoutBlockingWaitStrategy with a timeout of 10ms?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---