You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2014/06/19 23:27:25 UTC
[jira] [Commented] (STORM-350) Update disruptor to latest version
(3.2.1)
[ https://issues.apache.org/jira/browse/STORM-350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14037916#comment-14037916 ]
ASF GitHub Bot commented on STORM-350:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/134#discussion_r13994933
--- Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java ---
@@ -41,46 +37,47 @@
* the ability to catch up to the producer by processing tuples in batches.
*/
public class DisruptorQueue implements IStatefulObject {
- static final Object FLUSH_CACHE = new Object();
- static final Object INTERRUPT = new Object();
-
- RingBuffer<MutableObject> _buffer;
- Sequence _consumer;
- SequenceBarrier _barrier;
-
+ private static final Object FLUSH_CACHE = new Object();
+ private static final Object INTERRUPT = new Object();
+ private static final String PREFIX = "disruptor-";
+
+ private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
+ private final HashMap<String, Object> state = new HashMap<String, Object>(4);
+
+ private final String _queueName;
+ private final RingBuffer<MutableObject> _buffer;
+ private final Sequence _consumer;
+ private final SequenceBarrier _barrier;
+
// TODO: consider having a threadlocal cache of this variable to speed up reads?
volatile boolean consumerStartedFlag = false;
- ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
- private static String PREFIX = "disruptor-";
- private String _queueName = "";
-
- public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
- this._queueName = PREFIX + queueName;
- _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
+
+ public DisruptorQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
+ _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
+ _queueName = PREFIX + queueName;
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
- _buffer.setGatingSequences(_consumer);
- if(claim instanceof SingleThreadedClaimStrategy) {
- consumerStartedFlag = true;
- }
+ _buffer.addGatingSequences(_consumer);
+ consumerStartedFlag = producerType == ProducerType.SINGLE;
}
-
+
public String getName() {
- return _queueName;
+ return _queueName;
}
-
+
+
public void consumeBatch(EventHandler<Object> handler) {
consumeBatchToCursor(_barrier.getCursor(), handler);
}
-
+
public void haltWithInterrupt() {
publish(INTERRUPT);
}
-
+
public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
try {
final long nextSequence = _consumer.get() + 1;
- final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
+ final long availableSequence = _barrier.waitFor(nextSequence);
--- End diff --
Prior to this change we defaulted to the BlockingWaitStrategy. Now that timeouts are a part of the strategy, should we make a change to default to a TimeoutBlockingWaitStrategy with a timeout of 10ms?
> Update disruptor to latest version (3.2.1)
> ------------------------------------------
>
> Key: STORM-350
> URL: https://issues.apache.org/jira/browse/STORM-350
> Project: Apache Storm (Incubating)
> Issue Type: Dependency upgrade
> Reporter: Boris Aksenov
>
--
This message was sent by Atlassian JIRA
(v6.2#6252)