You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/08 18:52:07 UTC
svn commit: r1566069 [2/4] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/m...
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Sat Feb 8 17:52:05 2014
@@ -47,7 +47,7 @@ import java.util.concurrent.locks.Reentr
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-class QueueConsumer<T extends ConsumerTarget> implements Consumer
+class QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements Consumer
{
public static enum State
@@ -61,10 +61,9 @@ class QueueConsumer<T extends ConsumerTa
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _id;
- private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
private final Lock _stateChangeLock = new ReentrantLock();
private final long _createTime = System.currentTimeMillis();
- private final MessageInstance.ConsumerAcquiredState _owningState = new MessageInstance.ConsumerAcquiredState(this);
+ private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this);
private final boolean _acquires;
private final boolean _seesRequeues;
private final String _consumerName;
@@ -74,8 +73,10 @@ class QueueConsumer<T extends ConsumerTa
private final FilterManager _filters;
private final Class<? extends ServerMessage> _messageClass;
private final Object _sessionReference;
- private SimpleAMQQueue _queue;
- private GenericActor _logActor;
+ private Q _queue;
+ private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+ + "(UNKNOWN)"
+ + "] ");
static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
@@ -89,10 +90,10 @@ class QueueConsumer<T extends ConsumerTa
private final T _target;
private final SubFlushRunner _runner = new SubFlushRunner(this);
- private volatile QueueContext _queueContext;
- private StateChangeListener<? extends Consumer, State> _stateListener = new StateChangeListener<Consumer, State>()
+ private volatile QueueContext<E,Q,L> _queueContext;
+ private StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumer<T,E,Q,L>, State>()
{
- public void stateChanged(Consumer sub, State oldState, State newState)
+ public void stateChanged(QueueConsumer sub, State oldState, State newState)
{
CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
}
@@ -158,8 +159,7 @@ class QueueConsumer<T extends ConsumerTa
throw new RuntimeException(e);
}
}
- final StateChangeListener<Consumer, State> stateListener =
- (StateChangeListener<Consumer, State>) getStateListener();
+ final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> stateListener = getStateListener();
if(stateListener != null)
{
stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
@@ -251,12 +251,12 @@ class QueueConsumer<T extends ConsumerTa
return STATE_MAP.get(_target.getState());
}
- public final SimpleAMQQueue getQueue()
+ public final Q getQueue()
{
return _queue;
}
- final void setQueue(SimpleAMQQueue queue, boolean exclusive)
+ final void setQueue(Q queue, boolean exclusive)
{
if(getQueue() != null)
{
@@ -300,9 +300,9 @@ class QueueConsumer<T extends ConsumerTa
getQueue().flushConsumer(this);
}
- boolean resend(final MessageInstance entry) throws AMQException
+ boolean resend(final E entry) throws AMQException
{
- return getQueue().resend((QueueEntry)entry, this);
+ return getQueue().resend(entry, this);
}
final SubFlushRunner getRunner()
@@ -315,31 +315,26 @@ class QueueConsumer<T extends ConsumerTa
return _id;
}
- public final StateChangeListener<? extends Consumer, State> getStateListener()
+ public final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> getStateListener()
{
return _stateListener;
}
- public final void setStateListener(StateChangeListener<? extends Consumer, State> listener)
+ public final void setStateListener(StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> listener)
{
_stateListener = listener;
}
- final QueueContext getQueueContext()
+ final QueueContext<E,Q,L> getQueueContext()
{
return _queueContext;
}
- final void setQueueContext(QueueContext queueContext)
+ final void setQueueContext(QueueContext<E,Q,L> queueContext)
{
_queueContext = queueContext;
}
- protected boolean updateState(State from, State to)
- {
- return _state.compareAndSet(from, to);
- }
-
public final boolean isActive()
{
return getState() == State.ACTIVE;
@@ -355,7 +350,7 @@ class QueueConsumer<T extends ConsumerTa
_noLocal = noLocal;
}
- public final boolean hasInterest(MessageInstance entry)
+ public final boolean hasInterest(E entry)
{
//check that the message hasn't been rejected
if (entry.isRejectedBy(this))
@@ -405,7 +400,6 @@ class QueueConsumer<T extends ConsumerTa
filterLogString.append(delimiter);
}
filterLogString.append("Browser");
- hasEntries = true;
}
return filterLogString.toString();
@@ -431,7 +425,7 @@ class QueueConsumer<T extends ConsumerTa
return _createTime;
}
- final MessageInstance.ConsumerAcquiredState getOwningState()
+ final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState()
{
return _owningState;
}
@@ -466,7 +460,7 @@ class QueueConsumer<T extends ConsumerTa
return _deliveredCount.longValue();
}
- final void send(final QueueEntry entry, final boolean batch) throws AMQException
+ final void send(final E entry, final boolean batch) throws AMQException
{
_deliveredCount.incrementAndGet();
ServerMessage message = entry.getMessage();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java Sat Feb 8 17:52:05 2014
@@ -24,19 +24,19 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-class QueueConsumerList
+class QueueConsumerList<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- private final ConsumerNode _head = new ConsumerNode();
+ private final ConsumerNode<E,Q,L> _head = new ConsumerNode<E,Q,L>();
- private final AtomicReference<ConsumerNode> _tail = new AtomicReference<ConsumerNode>(_head);
- private final AtomicReference<ConsumerNode> _subNodeMarker = new AtomicReference<ConsumerNode>(_head);
+ private final AtomicReference<ConsumerNode<E,Q,L>> _tail = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
+ private final AtomicReference<ConsumerNode<E,Q,L>> _subNodeMarker = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
private final AtomicInteger _size = new AtomicInteger();
- public static final class ConsumerNode
+ public static final class ConsumerNode<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
private final AtomicBoolean _deleted = new AtomicBoolean();
- private final AtomicReference<ConsumerNode> _next = new AtomicReference<ConsumerNode>();
- private final QueueConsumer _sub;
+ private final AtomicReference<ConsumerNode<E,Q,L>> _next = new AtomicReference<ConsumerNode<E,Q,L>>();
+ private final QueueConsumer<?,E,Q,L> _sub;
public ConsumerNode()
{
@@ -45,7 +45,7 @@ class QueueConsumerList
_deleted.set(true);
}
- public ConsumerNode(final QueueConsumer sub)
+ public ConsumerNode(final QueueConsumer<?,E,Q,L> sub)
{
//used for regular node construction
_sub = sub;
@@ -57,12 +57,12 @@ class QueueConsumerList
*
* @return the next non-deleted node, or null if none was found.
*/
- public ConsumerNode findNext()
+ public ConsumerNode<E,Q,L> findNext()
{
- ConsumerNode next = nextNode();
+ ConsumerNode<E,Q,L> next = nextNode();
while(next != null && next.isDeleted())
{
- final ConsumerNode newNext = next.nextNode();
+ final ConsumerNode<E,Q,L> newNext = next.nextNode();
if(newNext != null)
{
//try to move our _next reference forward to the 'newNext'
@@ -86,7 +86,7 @@ class QueueConsumerList
*
* @return the immediately next node in the structure, or null if at the tail.
*/
- protected ConsumerNode nextNode()
+ protected ConsumerNode<E,Q,L> nextNode()
{
return _next.get();
}
@@ -97,7 +97,7 @@ class QueueConsumerList
* @param node the ConsumerNode to set as 'next'
* @return whether the operation succeeded
*/
- private boolean setNext(final ConsumerNode node)
+ private boolean setNext(final ConsumerNode<E,Q,L> node)
{
return _next.compareAndSet(null, node);
}
@@ -112,18 +112,18 @@ class QueueConsumerList
return _deleted.compareAndSet(false,true);
}
- public QueueConsumer getConsumer()
+ public QueueConsumer<?,E,Q,L> getConsumer()
{
return _sub;
}
}
- private void insert(final ConsumerNode node, final boolean count)
+ private void insert(final ConsumerNode<E,Q,L> node, final boolean count)
{
for (;;)
{
- ConsumerNode tail = _tail.get();
- ConsumerNode next = tail.nextNode();
+ ConsumerNode<E,Q,L> tail = _tail.get();
+ ConsumerNode<E,Q,L> next = tail.nextNode();
if (tail == _tail.get())
{
if (next == null)
@@ -146,16 +146,16 @@ class QueueConsumerList
}
}
- public void add(final QueueConsumer sub)
+ public void add(final QueueConsumer<?,E,Q,L> sub)
{
- ConsumerNode node = new ConsumerNode(sub);
+ ConsumerNode<E,Q,L> node = new ConsumerNode<E,Q,L>(sub);
insert(node, true);
}
- public boolean remove(final QueueConsumer sub)
+ public boolean remove(final QueueConsumer<?, E,Q,L> sub)
{
- ConsumerNode prevNode = _head;
- ConsumerNode node = _head.nextNode();
+ ConsumerNode<E,Q,L> prevNode = _head;
+ ConsumerNode<E,Q,L> node = _head.nextNode();
while(node != null)
{
@@ -170,7 +170,7 @@ class QueueConsumerList
//correctness reasons, however we have just 'deleted'
//the tail. Inserting an empty dummy node after it will
//let us scavenge the node containing the Consumer.
- insert(new ConsumerNode(), false);
+ insert(new ConsumerNode<E,Q,L>(), false);
}
//advance the next node reference in the 'prevNode' to scavenge
@@ -189,9 +189,9 @@ class QueueConsumerList
return false;
}
- private void nodeMarkerCleanup(final ConsumerNode node)
+ private void nodeMarkerCleanup(final ConsumerNode<E,Q,L> node)
{
- ConsumerNode markedNode = _subNodeMarker.get();
+ ConsumerNode<E,Q,L> markedNode = _subNodeMarker.get();
if(node == markedNode)
{
//if the marked node is the one we are removing, then
@@ -200,7 +200,7 @@ class QueueConsumerList
//into the list and find the next node to use.
//Because we inserted a dummy if node was the
//tail, markedNode.nextNode() can never be null.
- ConsumerNode dummy = new ConsumerNode();
+ ConsumerNode<E,Q,L> dummy = new ConsumerNode<E,Q,L>();
dummy.setNext(markedNode.nextNode());
//if the CAS fails the marked node has changed, thus
@@ -219,7 +219,7 @@ class QueueConsumerList
}
}
- public boolean updateMarkedNode(final ConsumerNode expected, final ConsumerNode nextNode)
+ public boolean updateMarkedNode(final ConsumerNode<E,Q,L> expected, final ConsumerNode<E,Q,L> nextNode)
{
return _subNodeMarker.compareAndSet(expected, nextNode);
}
@@ -231,41 +231,41 @@ class QueueConsumerList
*
* @return the previously marked node (or a dummy if it was subsequently deleted)
*/
- public ConsumerNode getMarkedNode()
+ public ConsumerNode<E,Q,L> getMarkedNode()
{
return _subNodeMarker.get();
}
- public static class ConsumerNodeIterator
+ public static class ConsumerNodeIterator<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- private ConsumerNode _lastNode;
+ private ConsumerNode<E,Q,L> _lastNode;
- ConsumerNodeIterator(ConsumerNode startNode)
+ ConsumerNodeIterator(ConsumerNode<E,Q,L> startNode)
{
_lastNode = startNode;
}
- public ConsumerNode getNode()
+ public ConsumerNode<E,Q,L> getNode()
{
return _lastNode;
}
public boolean advance()
{
- ConsumerNode nextNode = _lastNode.findNext();
+ ConsumerNode<E,Q,L> nextNode = _lastNode.findNext();
_lastNode = nextNode;
return _lastNode != null;
}
}
- public ConsumerNodeIterator iterator()
+ public ConsumerNodeIterator<E,Q,L> iterator()
{
- return new ConsumerNodeIterator(_head);
+ return new ConsumerNodeIterator<E,Q,L>(_head);
}
- public ConsumerNode getHead()
+ public ConsumerNode<E,Q,L> getHead()
{
return _head;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java Sat Feb 8 17:52:05 2014
@@ -23,32 +23,32 @@ package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-final class QueueContext
+final class QueueContext<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- private volatile QueueEntry _lastSeenEntry;
- private volatile QueueEntry _releasedEntry;
+ private volatile E _lastSeenEntry;
+ private volatile E _releasedEntry;
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
_lastSeenUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ (QueueContext.class, QueueEntryImpl.class, "_lastSeenEntry");
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
_releasedUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_releasedEntry");
+ (QueueContext.class, QueueEntryImpl.class, "_releasedEntry");
- public QueueContext(QueueEntry head)
+ public QueueContext(E head)
{
_lastSeenEntry = head;
}
- public QueueEntry getLastSeenEntry()
+ public E getLastSeenEntry()
{
return _lastSeenEntry;
}
- QueueEntry getReleasedEntry()
+ E getReleasedEntry()
{
return _releasedEntry;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sat Feb 8 17:52:05 2014
@@ -20,20 +20,21 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.MessageInstance;
-public interface QueueEntry extends MessageInstance<QueueConsumer>, Comparable<QueueEntry>
+public interface QueueEntry<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> extends MessageInstance<E,C>, Comparable<E>
{
- AMQQueue<QueueConsumer> getQueue();
+ Q getQueue();
long getSize();
boolean isQueueDeleted();
- QueueEntry getNextNode();
+ E getNextNode();
- QueueEntry getNextValidEntry();
+ E getNextValidEntry();
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sat Feb 8 17:52:05 2014
@@ -44,11 +44,11 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-public abstract class QueueEntryImpl implements QueueEntry
+public abstract class QueueEntryImpl<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements QueueEntry<E,Q,QueueConsumer<?,E,Q,L>>
{
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
- private final QueueEntryList _queueEntryList;
+ private final L _queueEntryList;
private final MessageReference _message;
@@ -63,7 +63,7 @@ public abstract class QueueEntryImpl imp
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> _stateChangeListeners;
+ private volatile Set<StateChangeListener<? super E, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -90,14 +90,14 @@ public abstract class QueueEntryImpl imp
private boolean _deliveredToConsumer;
- public QueueEntryImpl(QueueEntryList<?> queueEntryList)
+ public QueueEntryImpl(L queueEntryList)
{
this(queueEntryList,null,Long.MIN_VALUE);
_state = DELETED_STATE;
}
- public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId)
+ public QueueEntryImpl(L queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
@@ -107,7 +107,7 @@ public abstract class QueueEntryImpl imp
populateInstanceProperties();
}
- public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
+ public QueueEntryImpl(L queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
_message = message == null ? null : message.newReference();
@@ -138,7 +138,7 @@ public abstract class QueueEntryImpl imp
return _entryId;
}
- public AMQQueue<QueueConsumer> getQueue()
+ public Q getQueue()
{
return _queueEntryList.getQueue();
}
@@ -234,12 +234,12 @@ public abstract class QueueEntryImpl imp
if(state instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount(this);
+ getQueue().decrementUnackedMsgCount((E) this);
}
if(!getQueue().isDeleted())
{
- getQueue().requeue(this);
+ getQueue().requeue((E)this);
if(_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
@@ -315,13 +315,12 @@ public abstract class QueueEntryImpl imp
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- Consumer s = null;
if (state instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount(this);
+ getQueue().decrementUnackedMsgCount((E) this);
}
- getQueue().dequeue(this,s);
+ getQueue().dequeue((E)this);
if(_stateChangeListeners != null)
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
@@ -333,9 +332,9 @@ public abstract class QueueEntryImpl imp
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener<MessageInstance<QueueConsumer>, State> l : _stateChangeListeners)
+ for(StateChangeListener<? super E, State> l : _stateChangeListeners)
{
- l.stateChanged(this, oldState, newState);
+ l.stateChanged((E)this, oldState, newState);
}
}
@@ -345,7 +344,7 @@ public abstract class QueueEntryImpl imp
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.entryDeleted(this);
+ _queueEntryList.entryDeleted((E)this);
onDelete();
_message.release();
@@ -364,7 +363,7 @@ public abstract class QueueEntryImpl imp
dispose();
}
- public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn)
+ public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -412,21 +411,21 @@ public abstract class QueueEntryImpl imp
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
+ public void addStateChangeListener(StateChangeListener<? super E,State> listener)
{
- Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance<QueueConsumer>, State>>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super E, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<? super E, State> listener)
{
- Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
@@ -436,9 +435,9 @@ public abstract class QueueEntryImpl imp
}
- public int compareTo(final QueueEntry o)
+ public int compareTo(final E o)
{
- QueueEntryImpl other = (QueueEntryImpl)o;
+ E other = o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
@@ -446,7 +445,7 @@ public abstract class QueueEntryImpl imp
{
}
- public QueueEntryList getQueueEntryList()
+ public L getQueueEntryList()
{
return _queueEntryList;
}
@@ -494,10 +493,10 @@ public abstract class QueueEntryImpl imp
@Override
public boolean resend() throws AMQException
{
- QueueConsumer sub = getDeliveredConsumer();
+ QueueConsumer<?,E,Q,L> sub = getDeliveredConsumer();
if(sub != null)
{
- return sub.resend(this);
+ return sub.resend((E)this);
}
return false;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java Sat Feb 8 17:52:05 2014
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryIterator<QE extends QueueEntry>
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryIterator<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
{
boolean atTail();
- QE getNode();
+ E getNode();
boolean advance();
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sat Feb 8 17:52:05 2014
@@ -20,21 +20,23 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.ServerMessage;
-public interface QueueEntryList<Q extends QueueEntry>
+public interface QueueEntryList<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
{
- AMQQueue<QueueConsumer> getQueue();
+ Q getQueue();
- Q add(ServerMessage message);
+ E add(ServerMessage message);
- Q next(Q node);
+ E next(E node);
- QueueEntryIterator<Q> iterator();
+ QueueEntryIterator<E,Q,L,C> iterator();
- Q getHead();
+ E getHead();
- void entryDeleted(Q queueEntry);
+ void entryDeleted(E queueEntry);
int getPriorities();
+
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java Sat Feb 8 17:52:05 2014
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-interface QueueEntryListFactory
+interface QueueEntryListFactory<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- public QueueEntryList createQueueEntryList(AMQQueue queue);
+ public L createQueueEntryList(Q queue);
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java Sat Feb 8 17:52:05 2014
@@ -16,7 +16,9 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryVisitor
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryVisitor<E extends QueueEntry>
{
- boolean visit(QueueEntry entry);
+ boolean visit(E entry);
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sat Feb 8 17:52:05 2014
@@ -43,7 +43,6 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -52,6 +51,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -59,9 +59,9 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
- StateChangeListener<QueueConsumer, QueueConsumer.State>,
- MessageGroupManager.ConsumerResetHelper
+abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>,
+ StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>,
+ MessageGroupManager.ConsumerResetHelper<E,Q,L>
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -94,11 +94,11 @@ public class SimpleAMQQueue implements A
private Exchange _alternateExchange;
- private final QueueEntryList<QueueEntry> _entries;
+ private final L _entries;
- private final QueueConsumerList _consumerList = new QueueConsumerList();
+ private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>();
- private volatile QueueConsumer _exclusiveSubscriber;
+ private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber;
@@ -163,8 +163,7 @@ public class SimpleAMQQueue implements A
private LogSubject _logSubject;
private LogActor _logActor;
- private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
- private boolean _nolocal;
+ private boolean _noLocal;
private final AtomicBoolean _overfull = new AtomicBoolean(false);
private boolean _deleteOnNoConsumers;
@@ -177,20 +176,15 @@ public class SimpleAMQQueue implements A
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount;
- private final MessageGroupManager _messageGroupManager;
+ private final MessageGroupManager<E,Q,L> _messageGroupManager;
- private final Collection<ConsumerRegistrationListener> _consumerListeners =
- new ArrayList<ConsumerRegistrationListener>();
+ private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners =
+ new ArrayList<ConsumerRegistrationListener<Q>>();
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
- public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
- {
- this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
- }
-
protected SimpleAMQQueue(UUID id,
String name,
boolean durable,
@@ -198,7 +192,7 @@ public class SimpleAMQQueue implements A
boolean autoDelete,
boolean exclusive,
VirtualHost virtualHost,
- QueueEntryListFactory entryListFactory, Map<String,Object> arguments)
+ QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments)
{
if (name == null)
@@ -217,7 +211,7 @@ public class SimpleAMQQueue implements A
_autoDelete = autoDelete;
_exclusive = exclusive;
_virtualHost = virtualHost;
- _entries = entryListFactory.createQueueEntryList(this);
+ _entries = entryListFactory.createQueueEntryList((Q)this);
_arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
_id = id;
@@ -243,13 +237,13 @@ public class SimpleAMQQueue implements A
{
Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
- new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
+ new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else
{
- _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -281,21 +275,20 @@ public class SimpleAMQQueue implements A
}
catch (RejectedExecutionException ree)
{
- if (_stopped.get())
- {
- // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
- }
- else
+ // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+ if(!_stopped.get())
{
_logger.error("Unexpected rejected execution", ree);
throw ree;
+
}
+
}
}
public void setNoLocal(boolean nolocal)
{
- _nolocal = nolocal;
+ _noLocal = nolocal;
}
public UUID getId()
@@ -384,7 +377,7 @@ public class SimpleAMQQueue implements A
@Override
- public synchronized QueueConsumer addConsumer(final ConsumerTarget target,
+ public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
@@ -412,10 +405,10 @@ public class SimpleAMQQueue implements A
throw new ExistingConsumerPreventsExclusive();
}
- QueueConsumer consumer = new QueueConsumer(filters, messageClass,
- optionSet.contains(Consumer.Option.ACQUIRES),
- optionSet.contains(Consumer.Option.SEES_REQUEUES),
- consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
+ optionSet.contains(Consumer.Option.ACQUIRES),
+ optionSet.contains(Consumer.Option.SEES_REQUEUES),
+ consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
target.consumerAdded(consumer);
@@ -430,21 +423,21 @@ public class SimpleAMQQueue implements A
}
consumer.setStateListener(this);
- consumer.setQueueContext(new QueueContext(_entries.getHead()));
+ consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead()));
if (!isDeleted())
{
- consumer.setQueue(this, exclusive);
- if(_nolocal)
+ consumer.setQueue((Q)this, exclusive);
+ if(_noLocal)
{
- consumer.setNoLocal(_nolocal);
+ consumer.setNoLocal(true);
}
synchronized (_consumerListeners)
{
- for(ConsumerRegistrationListener listener : _consumerListeners)
+ for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
{
- listener.consumerAdded(this, consumer);
+ listener.consumerAdded((Q)this, consumer);
}
}
@@ -466,7 +459,7 @@ public class SimpleAMQQueue implements A
}
- synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException
+ synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer) throws AMQException
{
if (consumer == null)
{
@@ -494,9 +487,9 @@ public class SimpleAMQQueue implements A
synchronized (_consumerListeners)
{
- for(ConsumerRegistrationListener listener : _consumerListeners)
+ for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
{
- listener.consumerRemoved(this, consumer);
+ listener.consumerRemoved((Q)this, consumer);
}
}
@@ -519,10 +512,10 @@ public class SimpleAMQQueue implements A
}
- public Collection<QueueConsumer> getConsumers()
+ public Collection<QueueConsumer<?,E,Q,L>> getConsumers()
{
- List<QueueConsumer> consumers = new ArrayList<QueueConsumer>();
- QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
+ List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator();
while(iter.advance())
{
consumers.add(iter.getNode().getConsumer());
@@ -531,7 +524,7 @@ public class SimpleAMQQueue implements A
}
- public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
{
synchronized (_consumerListeners)
{
@@ -539,7 +532,7 @@ public class SimpleAMQQueue implements A
}
}
- public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
{
synchronized (_consumerListeners)
{
@@ -547,9 +540,9 @@ public class SimpleAMQQueue implements A
}
}
- public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
+ E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
_messageGroupManager.clearAssignments(consumer);
@@ -557,11 +550,11 @@ public class SimpleAMQQueue implements A
if(entry != null)
{
- QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
- QueueConsumer sub = subscriberIter.getNode().getConsumer();
+ QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -599,11 +592,6 @@ public class SimpleAMQQueue implements A
}
}
- public int getBindingCountHigh()
- {
- return _bindingCountHigh.get();
- }
-
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
@@ -626,7 +614,7 @@ public class SimpleAMQQueue implements A
// ------ Enqueue / Dequeue
- public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action) throws AMQException
{
incrementQueueCount();
incrementQueueSize(message);
@@ -634,8 +622,8 @@ public class SimpleAMQQueue implements A
_totalMessagesReceived.incrementAndGet();
- QueueEntry entry;
- final QueueConsumer exclusiveSub = _exclusiveSubscriber;
+ E entry;
+ final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
@@ -645,8 +633,8 @@ public class SimpleAMQQueue implements A
iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
- QueueConsumerList.ConsumerNode nextNode = node.findNext();
+ QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext();
if (nextNode == null)
{
nextNode = _consumerList.getHead().findNext();
@@ -682,7 +670,7 @@ public class SimpleAMQQueue implements A
else
{
// if consumer at end, and active, offer
- QueueConsumer sub = nextNode.getConsumer();
+ QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer();
deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
@@ -714,7 +702,7 @@ public class SimpleAMQQueue implements A
}
- private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry)
+ private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry)
throws AMQException
{
@@ -746,7 +734,7 @@ public class SimpleAMQQueue implements A
}
}
- private boolean assign(final QueueConsumer sub, final QueueEntry entry)
+ private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
if(_messageGroupManager == null)
{
@@ -760,7 +748,7 @@ public class SimpleAMQQueue implements A
}
}
- private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
+ private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
@@ -770,7 +758,7 @@ public class SimpleAMQQueue implements A
return (assigned == null) || (assigned == sub);
}
- protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final E entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
@@ -804,7 +792,7 @@ public class SimpleAMQQueue implements A
getAtomicQueueCount().incrementAndGet();
}
- private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch)
+ private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -815,18 +803,18 @@ public class SimpleAMQQueue implements A
sub.send(entry, batch);
}
- private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException
+ private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry) throws AMQException
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
- private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry)
+ private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
- QueueContext subContext = sub.getQueueContext();
+ QueueContext<E,Q,L> subContext = sub.getQueueContext();
if (subContext != null)
{
- QueueEntry releasedEntry = subContext.getReleasedEntry();
+ E releasedEntry = subContext.getReleasedEntry();
QueueContext._lastSeenUpdater.set(subContext, entry);
if(releasedEntry == entry)
@@ -836,13 +824,13 @@ public class SimpleAMQQueue implements A
}
}
- private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
- QueueContext subContext = sub.getQueueContext();
+ QueueContext<E,Q,L> subContext = sub.getQueueContext();
if(subContext != null)
{
- QueueEntry oldEntry;
+ E oldEntry;
while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
{
@@ -854,13 +842,13 @@ public class SimpleAMQQueue implements A
}
}
- public void requeue(QueueEntry entry)
+ public void requeue(E entry)
{
- QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
- QueueConsumer sub = subscriberIter.getNode().getConsumer();
+ QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -873,7 +861,8 @@ public class SimpleAMQQueue implements A
}
- public void dequeue(QueueEntry entry, Consumer sub)
+ @Override
+ public void dequeue(E entry)
{
decrementQueueCount();
decrementQueueSize(entry);
@@ -886,7 +875,7 @@ public class SimpleAMQQueue implements A
}
- private void decrementQueueSize(final QueueEntry entry)
+ private void decrementQueueSize(final E entry)
{
final ServerMessage message = entry.getMessage();
long size = message.getSize();
@@ -905,7 +894,7 @@ public class SimpleAMQQueue implements A
_dequeueCount.incrementAndGet();
}
- public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
+ public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer) throws AMQException
{
/* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
entry to resend and move back the consumer pointer. */
@@ -915,7 +904,7 @@ public class SimpleAMQQueue implements A
{
if (!consumer.isClosed())
{
- deliverMessage((QueueConsumer) consumer, entry, false);
+ deliverMessage(consumer, entry, false);
return true;
}
else
@@ -981,11 +970,11 @@ public class SimpleAMQQueue implements A
public long getOldestMessageArrivalTime()
{
- QueueEntry entry = getOldestQueueEntry();
+ E entry = getOldestQueueEntry();
return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
}
- protected QueueEntry getOldestQueueEntry()
+ protected E getOldestQueueEntry()
{
return _entries.next(_entries.getHead());
}
@@ -995,13 +984,13 @@ public class SimpleAMQQueue implements A
return _deleted.get();
}
- public List<QueueEntry> getMessagesOnTheQueue()
+ public List<E> getMessagesOnTheQueue()
{
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
+ ArrayList<E> entryList = new ArrayList<E>();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (node != null && !node.isDeleted())
{
entryList.add(node);
@@ -1011,7 +1000,7 @@ public class SimpleAMQQueue implements A
}
- public void stateChanged(QueueConsumer sub, QueueConsumer.State oldState, QueueConsumer.State newState)
+ public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState)
{
if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
{
@@ -1029,7 +1018,7 @@ public class SimpleAMQQueue implements A
}
}
- public int compareTo(final AMQQueue o)
+ public int compareTo(final Q o)
{
return _name.compareTo(o.getName());
}
@@ -1049,7 +1038,7 @@ public class SimpleAMQQueue implements A
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber)
+ private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
@@ -1060,32 +1049,32 @@ public class SimpleAMQQueue implements A
}
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
- protected QueueEntryList getEntries()
+ protected L getEntries()
{
return _entries;
}
- protected QueueConsumerList getConsumerList()
+ protected QueueConsumerList<E,Q,L> getConsumerList()
{
return _consumerList;
}
- public static interface QueueEntryFilter
+ public static interface QueueEntryFilter<E extends QueueEntry>
{
- public boolean accept(QueueEntry entry);
+ public boolean accept(E entry);
public boolean filterComplete();
}
- public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
+ public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
{
- return getMessagesOnTheQueue(new QueueEntryFilter()
+ return getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
final long messageId = entry.getMessage().getMessageNumber();
return messageId >= fromMessageId && messageId <= toMessageId;
@@ -1098,13 +1087,13 @@ public class SimpleAMQQueue implements A
});
}
- public QueueEntry getMessageOnTheQueue(final long messageId)
+ public E getMessageOnTheQueue(final long messageId)
{
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
private boolean _complete;
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
_complete = entry.getMessage().getMessageNumber() == messageId;
return _complete;
@@ -1118,13 +1107,13 @@ public class SimpleAMQQueue implements A
return entries.isEmpty() ? null : entries.get(0);
}
- public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
+ public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter)
{
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
+ ArrayList<E> entryList = new ArrayList<E>();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance() && !filter.filterComplete())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
@@ -1134,13 +1123,13 @@ public class SimpleAMQQueue implements A
}
- public void visit(final QueueEntryVisitor visitor)
+ public void visit(final QueueEntryVisitor<E> visitor)
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while(queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if(!node.isDeleted())
{
@@ -1157,17 +1146,17 @@ public class SimpleAMQQueue implements A
*
* The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
* Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
- * @param fromPosition
- * @param toPosition
- * @return
+ * @param fromPosition first message position
+ * @param toPosition last message position
+ * @return list of messages
*/
- public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
+ public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
{
- return getMessagesOnTheQueue(new QueueEntryFilter()
+ return getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
private long position = 0;
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
position++;
return (position >= fromPosition) && (position <= toPosition);
@@ -1196,12 +1185,12 @@ public class SimpleAMQQueue implements A
// TODO - now only used by the tests
public void deleteMessageFromTop()
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
boolean noDeletes = true;
while (noDeletes && queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (node.acquire())
{
dequeueEntry(node);
@@ -1224,14 +1213,14 @@ public class SimpleAMQQueue implements A
throw new AMQSecurityException("Permission denied: queue " + getName());
}
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
long count = 0;
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (node.acquire())
{
dequeueEntry(node, txn);
@@ -1248,13 +1237,13 @@ public class SimpleAMQQueue implements A
return count;
}
- private void dequeueEntry(final QueueEntry node)
+ private void dequeueEntry(final E node)
{
ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
dequeueEntry(node, txn);
}
- private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
+ private void dequeueEntry(final E node, ServerTransaction txn)
{
txn.dequeue(this, node.getMessage(),
new ServerTransaction.Action()
@@ -1283,7 +1272,7 @@ public class SimpleAMQQueue implements A
}
// TODO list all thrown exceptions
- public int delete() throws AMQSecurityException, AMQException
+ public int delete() throws AMQException
{
// Check access
if (!_virtualHost.getSecurityManager().authoriseDelete(this))
@@ -1313,10 +1302,10 @@ public class SimpleAMQQueue implements A
}
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
return entry.acquire();
}
@@ -1330,7 +1319,7 @@ public class SimpleAMQQueue implements A
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- for(final QueueEntry entry : entries)
+ for(final E entry : entries)
{
// TODO log requeues with a post enqueue action
int requeues = entry.routeToAlternate(null, txn);
@@ -1435,7 +1424,7 @@ public class SimpleAMQQueue implements A
}
- public void deliverAsync(QueueConsumer sub)
+ public void deliverAsync(QueueConsumer<?,E,Q,L> sub)
{
if(_exclusiveSubscriber == null)
{
@@ -1449,7 +1438,7 @@ public class SimpleAMQQueue implements A
}
- void flushConsumer(QueueConsumer sub) throws AMQException
+ void flushConsumer(QueueConsumer<?,E,Q,L> sub) throws AMQException
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
@@ -1459,7 +1448,7 @@ public class SimpleAMQQueue implements A
flushConsumer(sub, Long.MAX_VALUE);
}
- boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException
+ boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations) throws AMQException
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1480,8 +1469,8 @@ public class SimpleAMQQueue implements A
sub.getSendLock();
}
- atTail = attemptDelivery((QueueConsumer)sub, true);
- if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null)
+ atTail = attemptDelivery(sub, true);
+ if (atTail && getNextAvailableEntry(sub) == null)
{
queueEmpty = true;
}
@@ -1532,12 +1521,12 @@ public class SimpleAMQQueue implements A
* Looks up the next node for the consumer and attempts to deliver it.
*
*
- * @param sub
- * @param batch
+ * @param sub the consumer
+ * @param batch true if processing can be batched
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException
+ private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1545,7 +1534,7 @@ public class SimpleAMQQueue implements A
if (subActive)
{
- QueueEntry node = getNextAvailableEntry(sub);
+ E node = getNextAvailableEntry(sub);
if (node != null && node.isAvailable())
{
@@ -1582,11 +1571,11 @@ public class SimpleAMQQueue implements A
protected void advanceAllConsumers() throws AMQException
{
- QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
while (consumerNodeIterator.advance())
{
- QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
- QueueConsumer sub = subNode.getConsumer();
+ QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode();
+ QueueConsumer<?,E,Q,L> sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
@@ -1598,16 +1587,16 @@ public class SimpleAMQQueue implements A
}
}
- private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
+ private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
throws AMQException
{
- QueueContext context = sub.getQueueContext();
+ QueueContext<E,Q,L> context = sub.getQueueContext();
if(context != null)
{
- QueueEntry lastSeen = context.getLastSeenEntry();
- QueueEntry releasedNode = context.getReleasedEntry();
+ E lastSeen = context.getLastSeenEntry();
+ E releasedNode = context.getReleasedEntry();
- QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+ E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
@@ -1639,12 +1628,12 @@ public class SimpleAMQQueue implements A
}
}
- public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub)
+ public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub)
{
- QueueContext context = sub.getQueueContext();
+ QueueContext<E,Q,L> context = sub.getQueueContext();
if(context != null)
{
- QueueEntry releasedNode = context.getReleasedEntry();
+ E releasedNode = context.getReleasedEntry();
return releasedNode != null && releasedNode.compareTo(entry) < 0;
}
else
@@ -1681,7 +1670,7 @@ public class SimpleAMQQueue implements A
*/
public long processQueue(QueueRunner runner) throws AMQException
{
- long stateChangeCount = Long.MIN_VALUE;
+ long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
long rVal = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
@@ -1716,11 +1705,11 @@ public class SimpleAMQQueue implements A
boolean allConsumersDone = true;
boolean consumerDone;
- QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
while (consumerNodeIterator.advance())
{
- QueueConsumer sub = consumerNodeIterator.getNode().getConsumer();
+ QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
@@ -1802,11 +1791,11 @@ public class SimpleAMQQueue implements A
public void checkMessageStatus() throws AMQException
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
// Only process nodes that are not currently deleted and not dequeued
if (!node.isDeleted())
{
@@ -1953,12 +1942,12 @@ public class SimpleAMQQueue implements A
return _notificationChecks;
}
- private final class QueueEntryListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
+ private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
{
- private final QueueConsumer _sub;
+ private final QueueConsumer<?,E,Q,L> _sub;
- public QueueEntryListener(final QueueConsumer sub)
+ public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub)
{
_sub = sub;
}
@@ -1974,7 +1963,7 @@ public class SimpleAMQQueue implements A
return System.identityHashCode(_sub);
}
- public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
deliverAsync(_sub);
@@ -2082,13 +2071,13 @@ public class SimpleAMQQueue implements A
return _unackedMsgBytes.get();
}
- public void decrementUnackedMsgCount(QueueEntry queueEntry)
+ public void decrementUnackedMsgCount(E queueEntry)
{
_unackedMsgCount.decrementAndGet();
_unackedMsgBytes.addAndGet(-queueEntry.getSize());
}
- private void incrementUnackedMsgCount(QueueEntry entry)
+ private void incrementUnackedMsgCount(E entry)
{
_unackedMsgCount.incrementAndGet();
_unackedMsgBytes.addAndGet(entry.getSize());
@@ -2159,10 +2148,10 @@ public class SimpleAMQQueue implements A
return (String) _arguments.get(Queue.DESCRIPTION);
}
- public final int send(final ServerMessage message,
+ public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+ final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
{
txn.enqueue(this,message, new ServerTransaction.Action()
{
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1566069&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Sat Feb 8 17:52:05 2014
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface SimpleQueueEntryList<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QueueEntryList<E,Q,L,QueueConsumer<?,E,Q,L>>
+{
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Sat Feb 8 17:52:05 2014
@@ -29,7 +29,7 @@ import org.apache.qpid.server.virtualhos
import java.util.Map;
import java.util.UUID;
-public class SortedQueue extends OutOfOrderQueue
+public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
{
//Lock object to synchronize enqueue. Used instead of the object
//monitor to prevent lock order issues with consumer sendLocks
@@ -41,17 +41,33 @@ public class SortedQueue extends OutOfOr
final boolean durable, final String owner, final boolean autoDelete,
final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
{
+ this(id, name, durable, owner, autoDelete, exclusive,
+ virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName));
+ }
+
+
+ protected SortedQueue(UUID id, final String name,
+ final boolean durable, final String owner, final boolean autoDelete,
+ final boolean exclusive, final VirtualHost virtualHost,
+ Map<String, Object> arguments,
+ String sortedPropertyName,
+ QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> factory)
+ {
super(id, name, durable, owner, autoDelete, exclusive,
- virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments);
+ virtualHost, factory, arguments);
this._sortedPropertyName = sortedPropertyName;
}
+
public String getSortedPropertyName()
{
return _sortedPropertyName;
}
- public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
+ @Override
+ public void enqueue(final ServerMessage message,
+ final Action<? super MessageInstance<?, QueueConsumer<?, SortedQueueEntry, SortedQueue, SortedQueueEntryList>>> action)
+ throws AMQException
{
synchronized (_sortedQueueLock)
{
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java (from r1565732, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java&r1=1565732&r2=1566069&rev=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java Sat Feb 8 17:52:05 2014
@@ -24,37 +24,37 @@ import org.apache.qpid.server.message.Se
/**
* An implementation of QueueEntryImpl to be used in SortedQueueEntryList.
*/
-public class SortedQueueEntryImpl extends QueueEntryImpl
+public class SortedQueueEntry extends QueueEntryImpl<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
{
public static enum Colour
{
RED, BLACK
};
- private volatile SortedQueueEntryImpl _next;
- private SortedQueueEntryImpl _prev;
+ private volatile SortedQueueEntry _next;
+ private SortedQueueEntry _prev;
private String _key;
private Colour _colour = Colour.BLACK;
- private SortedQueueEntryImpl _parent;
- private SortedQueueEntryImpl _left;
- private SortedQueueEntryImpl _right;
+ private SortedQueueEntry _parent;
+ private SortedQueueEntry _left;
+ private SortedQueueEntry _right;
- public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList)
+ public SortedQueueEntry(final SortedQueueEntryList queueEntryList)
{
super(queueEntryList);
}
- public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList,
- final ServerMessage message, final long entryId)
+ public SortedQueueEntry(final SortedQueueEntryList queueEntryList,
+ final ServerMessage message, final long entryId)
{
super(queueEntryList, message, entryId);
}
@Override
- public int compareTo(final QueueEntry o)
+ public int compareTo(final SortedQueueEntry o)
{
- final String otherKey = ((SortedQueueEntryImpl) o)._key;
+ final String otherKey = o._key;
final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey);
return compare == 0 ? super.compareTo(o) : compare;
}
@@ -69,33 +69,33 @@ public class SortedQueueEntryImpl extend
return _key;
}
- public SortedQueueEntryImpl getLeft()
+ public SortedQueueEntry getLeft()
{
return _left;
}
- public SortedQueueEntryImpl getNextNode()
+ public SortedQueueEntry getNextNode()
{
return _next;
}
@Override
- public SortedQueueEntryImpl getNextValidEntry()
+ public SortedQueueEntry getNextValidEntry()
{
return getNextNode();
}
- public SortedQueueEntryImpl getParent()
+ public SortedQueueEntry getParent()
{
return _parent;
}
- public SortedQueueEntryImpl getPrev()
+ public SortedQueueEntry getPrev()
{
return _prev;
}
- public SortedQueueEntryImpl getRight()
+ public SortedQueueEntry getRight()
{
return _right;
}
@@ -110,27 +110,27 @@ public class SortedQueueEntryImpl extend
_key = key;
}
- public void setLeft(final SortedQueueEntryImpl left)
+ public void setLeft(final SortedQueueEntry left)
{
_left = left;
}
- public void setNext(final SortedQueueEntryImpl next)
+ public void setNext(final SortedQueueEntry next)
{
_next = next;
}
- public void setParent(final SortedQueueEntryImpl parent)
+ public void setParent(final SortedQueueEntry parent)
{
_parent = parent;
}
- public void setPrev(final SortedQueueEntryImpl prev)
+ public void setPrev(final SortedQueueEntry prev)
{
_prev = prev;
}
- public void setRight(final SortedQueueEntryImpl right)
+ public void setRight(final SortedQueueEntry right)
{
_right = right;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org