You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/31 02:23:20 UTC
[5/8] git commit: code format
code format
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7d50fbe0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7d50fbe0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7d50fbe0
Branch: refs/heads/master
Commit: 7d50fbe0a2191b634dc71cd9e7c35d5af24bf063
Parents: b47932d
Author: Boris Aksenov <ak...@corp.finam.ru>
Authored: Sat Jul 5 16:04:09 2014 +0400
Committer: Boris Aksenov <ak...@corp.finam.ru>
Committed: Sat Jul 5 16:04:09 2014 +0400
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d50fbe0/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 25bfcf8..d495ccf 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -45,8 +45,6 @@ public class DisruptorQueue implements IStatefulObject {
private static final Object INTERRUPT = new Object();
private static final String PREFIX = "disruptor-";
-
-
private final String _queueName;
private final RingBuffer<MutableObject> _buffer;
private final Sequence _consumer;
@@ -54,14 +52,13 @@ public class DisruptorQueue implements IStatefulObject {
// TODO: consider having a threadlocal cache of this variable to speed up reads?
volatile boolean consumerStartedFlag = false;
+
private final HashMap<String, Object> state = new HashMap<String, Object>(4);
-
private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
-
-
private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
private final Lock readLock = cacheLock.readLock();
private final Lock writeLock = cacheLock.writeLock();
+
public DisruptorQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
this._queueName = PREFIX + queueName;
_buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);