You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/27 00:27:43 UTC
svn commit: r1572343 [3/7] - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/binding/
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/...
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Feb 26 23:27:39 2014
@@ -18,6 +18,8 @@
*/
package org.apache.qpid.server.queue;
+import java.lang.reflect.Type;
+import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.util.*;
@@ -30,14 +32,17 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.model.ExclusivityPolicy;
-import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.exchange.NonDefaultExchange;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.adapter.AbstractConfiguredObject;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -49,11 +54,11 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
@@ -65,14 +70,14 @@ import org.apache.qpid.server.util.Serve
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.management.NotificationListener;
import javax.security.auth.Subject;
-abstract class AbstractQueue<E extends QueueEntryImpl<E,Q,L>,
- Q extends AbstractQueue<E, Q,L>,
- L extends QueueEntryListBase<E,Q,L>>
- implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>,
- StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>,
- MessageGroupManager.ConsumerResetHelper<E,Q,L>
+public abstract class AbstractQueue
+ extends AbstractConfiguredObject<AbstractQueue>
+ implements AMQQueue<AbstractQueue>,
+ StateChangeListener<QueueConsumer<?>, State>,
+ MessageGroupManager.ConsumerResetHelper
{
private static final Logger _logger = Logger.getLogger(AbstractQueue.class);
@@ -83,6 +88,16 @@ abstract class AbstractQueue<E extends Q
// TODO - should make this configurable at the vhost / broker level
private static final int DEFAULT_MAX_GROUPS = 255;
+ private static final QueueNotificationListener NULL_NOTIFICATION_LISTENER = new QueueNotificationListener()
+ {
+ @Override
+ public void notifyClients(final NotificationCheck notification,
+ final Queue queue,
+ final String notificationMsg)
+ {
+
+ }
+ };
private final VirtualHost _virtualHost;
@@ -93,14 +108,14 @@ abstract class AbstractQueue<E extends Q
private final boolean _durable;
- private Exchange _alternateExchange;
+ private NonDefaultExchange _alternateExchange;
- private final L _entries;
+ private final QueueEntryList _entries;
- private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>();
+ private final QueueConsumerList _consumerList = new QueueConsumerList();
- private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber;
+ private volatile QueueConsumer<?> _exclusiveSubscriber;
@@ -163,8 +178,8 @@ abstract class AbstractQueue<E extends Q
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private final List<Action<? super Q>> _deleteTaskList =
- new CopyOnWriteArrayList<Action<? super Q>>();
+ private final List<Action<? super AMQQueue>> _deleteTaskList =
+ new CopyOnWriteArrayList<Action<? super AMQQueue>>();
private LogSubject _logSubject;
@@ -173,7 +188,7 @@ abstract class AbstractQueue<E extends Q
private boolean _noLocal;
private final AtomicBoolean _overfull = new AtomicBoolean(false);
- private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
+ private final CopyOnWriteArrayList<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>();
private UUID _id;
private final Map<String, Object> _arguments;
@@ -182,18 +197,20 @@ abstract class AbstractQueue<E extends Q
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount;
- private final MessageGroupManager<E,Q,L> _messageGroupManager;
+ private final MessageGroupManager _messageGroupManager;
- private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners =
- new ArrayList<ConsumerRegistrationListener<Q>>();
+ private final Collection<ConsumerRegistrationListener<? super MessageSource>> _consumerListeners =
+ new ArrayList<ConsumerRegistrationListener<? super MessageSource>>();
- private AMQQueue.NotificationListener _notificationListener;
+ private QueueNotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
protected AbstractQueue(VirtualHost virtualHost,
Map<String, Object> attributes,
- QueueEntryListFactory<E, Q, L> entryListFactory)
+ QueueEntryListFactory entryListFactory)
{
+ super(MapValueConverter.getUUIDAttribute(Queue.ID, attributes),
+ Collections.<String,Object>emptyMap(), attributes, virtualHost.getTaskExecutor());
if (virtualHost == null)
{
throw new IllegalArgumentException("Virtual Host must not be null");
@@ -223,7 +240,7 @@ abstract class AbstractQueue<E extends Q
_name = name;
_durable = durable;
_virtualHost = virtualHost;
- _entries = entryListFactory.createQueueEntryList((Q) this);
+ _entries = entryListFactory.createQueueEntryList(this);
final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
@@ -421,13 +438,13 @@ abstract class AbstractQueue<E extends Q
{
Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
- new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)),
+ new DefinedGroupMessageGroupManager(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)),
defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else
{
- _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(attributes.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -499,11 +516,6 @@ abstract class AbstractQueue<E extends Q
_noLocal = nolocal;
}
- public UUID getId()
- {
- return _id;
- }
-
public boolean isDurable()
{
return _durable;
@@ -514,12 +526,12 @@ abstract class AbstractQueue<E extends Q
return _exclusivityPolicy != ExclusivityPolicy.NONE;
}
- public Exchange getAlternateExchange()
+ public NonDefaultExchange getAlternateExchange()
{
return _alternateExchange;
}
- public void setAlternateExchange(Exchange exchange)
+ public void setAlternateExchange(NonDefaultExchange exchange)
{
if(_alternateExchange != null)
{
@@ -540,9 +552,120 @@ abstract class AbstractQueue<E extends Q
}
@Override
- public Object getAttribute(String attrName)
+ public Object getAttribute(String name)
{
- return _arguments.get(attrName);
+ if(ALTERNATE_EXCHANGE.equals(name))
+ {
+ return getAlternateExchange();
+ }
+ else if(OWNER.equals(name))
+ {
+ return getOwner();
+ }
+ else if(NAME.equals(name))
+ {
+ return getName();
+ }
+ if(ALERT_REPEAT_GAP.equals(name))
+ {
+ return getAlertRepeatGap();
+ }
+ else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
+ {
+ return getAlertThresholdMessageAge();
+ }
+ else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
+ {
+ return getAlertThresholdMessageSize();
+ }
+ else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
+ {
+ return getAlertThresholdQueueDepthBytes();
+ }
+ else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
+ {
+ return getAlertThresholdQueueDepthMessages();
+ }
+ else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
+ {
+ //We only return the boolean value if message groups are actually in use
+ return _arguments.get(MESSAGE_GROUP_KEY) == null ? null : _arguments.get(MESSAGE_GROUP_SHARED_GROUPS);
+ }
+ else if(LVQ_KEY.equals(name))
+ {
+ if(this instanceof ConflationQueue)
+ {
+ return ((ConflationQueue)this).getConflationKey();
+ }
+ }
+ else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
+ {
+ return getMaximumDeliveryAttempts();
+ }
+ else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
+ {
+ return getQueueFlowControlSizeBytes();
+ }
+ else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
+ {
+ return getQueueFlowResumeSizeBytes();
+ }
+ else if(QUEUE_FLOW_STOPPED.equals(name))
+ {
+ return isOverfull();
+ }
+ else if(SORT_KEY.equals(name))
+ {
+ if(this instanceof SortedQueue)
+ {
+ return ((SortedQueue)this).getSortedPropertyName();
+ }
+ }
+ else if(QUEUE_TYPE.equals(name))
+ {
+ if(this instanceof SortedQueue)
+ {
+ return "sorted";
+ }
+ if(this instanceof ConflationQueue)
+ {
+ return "lvq";
+ }
+ if(this instanceof PriorityQueue)
+ {
+ return "priority";
+ }
+ return "standard";
+ }
+ else if(DURABLE.equals(name))
+ {
+ return isDurable();
+ }
+ else if(ID.equals(name))
+ {
+ return getId();
+ }
+ else if(LIFETIME_POLICY.equals(name))
+ {
+ return getLifetimePolicy();
+ }
+ else if(STATE.equals(name))
+ {
+ return State.ACTIVE; // TODO
+ }
+ else if (DESCRIPTION.equals(name))
+ {
+ return getDescription();
+ }
+ else if(PRIORITIES.equals(name))
+ {
+ if(this instanceof PriorityQueue)
+ {
+ return ((PriorityQueue)this).getPriorities();
+ }
+ }
+
+ return _arguments.get(name);
}
@Override
@@ -580,7 +703,7 @@ abstract class AbstractQueue<E extends Q
@Override
- public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target,
+ public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
@@ -678,11 +801,12 @@ abstract class AbstractQueue<E extends Q
throw new ExistingConsumerPreventsExclusive();
}
- QueueConsumerImpl<T,E,Q,L> consumer = new QueueConsumerImpl<T,E,Q,L>((Q)this,
- target,
- consumerName,
- filters, messageClass,
- optionSet);
+ QueueConsumerImpl consumer = new QueueConsumerImpl(this,
+ target,
+ consumerName,
+ filters,
+ messageClass,
+ optionSet);
_exclusiveOwner = exclusiveOwner;
target.consumerAdded(consumer);
@@ -699,15 +823,15 @@ abstract class AbstractQueue<E extends Q
}
consumer.setStateListener(this);
- consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead()));
+ consumer.setQueueContext(new QueueContext(_entries.getHead()));
if (!isDeleted())
{
synchronized (_consumerListeners)
{
- for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
+ for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerListeners)
{
- listener.consumerAdded((Q)this, consumer);
+ listener.consumerAdded(this, consumer);
}
}
@@ -729,7 +853,7 @@ abstract class AbstractQueue<E extends Q
}
- synchronized void unregisterConsumer(final QueueConsumerImpl<?,E,Q,L> consumer)
+ synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
{
if (consumer == null)
{
@@ -758,9 +882,9 @@ abstract class AbstractQueue<E extends Q
synchronized (_consumerListeners)
{
- for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
+ for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerListeners)
{
- listener.consumerRemoved((Q)this, consumer);
+ listener.consumerRemoved(this, consumer);
}
}
@@ -787,10 +911,10 @@ abstract class AbstractQueue<E extends Q
}
- public Collection<QueueConsumer<?,E,Q,L>> getConsumers()
+ public Collection<QueueConsumer<?>> getConsumers()
{
- List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>();
- QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator();
+ List<QueueConsumer<?>> consumers = new ArrayList<QueueConsumer<?>>();
+ QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
while(iter.advance())
{
consumers.add(iter.getNode().getConsumer());
@@ -799,7 +923,7 @@ abstract class AbstractQueue<E extends Q
}
- public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
{
synchronized (_consumerListeners)
{
@@ -807,7 +931,7 @@ abstract class AbstractQueue<E extends Q
}
}
- public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
{
synchronized (_consumerListeners)
{
@@ -815,9 +939,9 @@ abstract class AbstractQueue<E extends Q
}
}
- public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer<?> consumer, boolean clearAssignments)
{
- E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
_messageGroupManager.clearAssignments(consumer);
@@ -825,11 +949,11 @@ abstract class AbstractQueue<E extends Q
if(entry != null)
{
- QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
- QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
+ QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -843,7 +967,7 @@ abstract class AbstractQueue<E extends Q
}
}
- public void addBinding(final Binding binding)
+ public void addBinding(final BindingImpl binding)
{
_bindings.add(binding);
int bindingCount = _bindings.size();
@@ -857,12 +981,12 @@ abstract class AbstractQueue<E extends Q
}
}
- public void removeBinding(final Binding binding)
+ public void removeBinding(final BindingImpl binding)
{
_bindings.remove(binding);
}
- public List<Binding> getBindings()
+ public Collection<BindingImpl> getBindings()
{
return Collections.unmodifiableList(_bindings);
}
@@ -879,7 +1003,7 @@ abstract class AbstractQueue<E extends Q
// ------ Enqueue / Dequeue
- public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action)
+ public void enqueue(ServerMessage message, Action<? super MessageInstance> action)
{
incrementQueueCount();
incrementQueueSize(message);
@@ -887,8 +1011,8 @@ abstract class AbstractQueue<E extends Q
_totalMessagesReceived.incrementAndGet();
- E entry;
- final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber;
+ QueueEntry entry;
+ final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
@@ -898,8 +1022,8 @@ abstract class AbstractQueue<E extends Q
iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode();
- QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext();
+ QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode nextNode = node.findNext();
if (nextNode == null)
{
nextNode = _consumerList.getHead().findNext();
@@ -935,7 +1059,7 @@ abstract class AbstractQueue<E extends Q
else
{
// if consumer at end, and active, offer
- QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer();
+ QueueConsumer<?> sub = nextNode.getConsumer();
deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
@@ -967,7 +1091,7 @@ abstract class AbstractQueue<E extends Q
}
- private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry)
+ private void deliverToConsumer(final QueueConsumer<?> sub, final QueueEntry entry)
{
if(sub.trySendLock())
@@ -998,7 +1122,7 @@ abstract class AbstractQueue<E extends Q
}
}
- private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry)
+ private boolean assign(final QueueConsumer<?> sub, final QueueEntry entry)
{
if(_messageGroupManager == null)
{
@@ -1012,17 +1136,17 @@ abstract class AbstractQueue<E extends Q
}
}
- private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry)
+ private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
return true;
}
- QueueConsumer<?,E,Q,L> assigned = _messageGroupManager.getAssignedConsumer(entry);
+ QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
- protected void checkConsumersNotAheadOfDelivery(final E entry)
+ protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
@@ -1041,12 +1165,12 @@ abstract class AbstractQueue<E extends Q
}
}
- public long getTotalDequeueCount()
+ public long getTotalDequeuedMessages()
{
return _dequeueCount.get();
}
- public long getTotalEnqueueCount()
+ public long getTotalEnqueuedMessages()
{
return _enqueueCount.get();
}
@@ -1056,7 +1180,7 @@ abstract class AbstractQueue<E extends Q
getAtomicQueueCount().incrementAndGet();
}
- private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch)
+ private void deliverMessage(final QueueConsumer<?> sub, final QueueEntry entry, boolean batch)
{
setLastSeenEntry(sub, entry);
@@ -1066,18 +1190,18 @@ abstract class AbstractQueue<E extends Q
sub.send(entry, batch);
}
- private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry)
+ private boolean consumerReadyAndHasInterest(final QueueConsumer<?> sub, final QueueEntry entry)
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
- private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
+ private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry entry)
{
- QueueContext<E,Q,L> subContext = sub.getQueueContext();
+ QueueContext subContext = sub.getQueueContext();
if (subContext != null)
{
- E releasedEntry = subContext.getReleasedEntry();
+ QueueEntry releasedEntry = subContext.getReleasedEntry();
QueueContext._lastSeenUpdater.set(subContext, entry);
if(releasedEntry == entry)
@@ -1087,13 +1211,13 @@ abstract class AbstractQueue<E extends Q
}
}
- private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
+ private void updateSubRequeueEntry(final QueueConsumer<?> sub, final QueueEntry entry)
{
- QueueContext<E,Q,L> subContext = sub.getQueueContext();
+ QueueContext subContext = sub.getQueueContext();
if(subContext != null)
{
- E oldEntry;
+ QueueEntry oldEntry;
while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
{
@@ -1105,13 +1229,13 @@ abstract class AbstractQueue<E extends Q
}
}
- public void requeue(E entry)
+ public void requeue(QueueEntry entry)
{
- QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
- QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
+ QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -1125,7 +1249,7 @@ abstract class AbstractQueue<E extends Q
}
@Override
- public void dequeue(E entry)
+ public void dequeue(QueueEntry entry)
{
decrementQueueCount();
decrementQueueSize(entry);
@@ -1138,7 +1262,7 @@ abstract class AbstractQueue<E extends Q
}
- private void decrementQueueSize(final E entry)
+ private void decrementQueueSize(final QueueEntry entry)
{
final ServerMessage message = entry.getMessage();
long size = message.getSize();
@@ -1157,7 +1281,7 @@ abstract class AbstractQueue<E extends Q
_dequeueCount.incrementAndGet();
}
- public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer)
+ public boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer)
{
/* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
entry to resend and move back the consumer pointer. */
@@ -1188,7 +1312,7 @@ abstract class AbstractQueue<E extends Q
return _consumerList.size();
}
- public int getActiveConsumerCount()
+ public int getConsumerCountWithCredit()
{
return _activeSubscriberCount.get();
}
@@ -1200,22 +1324,23 @@ abstract class AbstractQueue<E extends Q
public boolean isEmpty()
{
- return getMessageCount() == 0;
+ return getQueueDepthMessages() == 0;
}
- public int getMessageCount()
+ @Override
+ public int getQueueDepthMessages()
{
return getAtomicQueueCount().get();
}
- public long getQueueDepth()
+ public long getQueueDepthBytes()
{
return getAtomicQueueSize().get();
}
public int getUndeliveredMessageCount()
{
- int count = getMessageCount() - _deliveredMessages.get();
+ int count = getQueueDepthMessages() - _deliveredMessages.get();
if (count < 0)
{
return 0;
@@ -1233,11 +1358,11 @@ abstract class AbstractQueue<E extends Q
public long getOldestMessageArrivalTime()
{
- E entry = getOldestQueueEntry();
+ QueueEntry entry = getOldestQueueEntry();
return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
}
- protected E getOldestQueueEntry()
+ protected QueueEntry getOldestQueueEntry()
{
return _entries.next(_entries.getHead());
}
@@ -1247,13 +1372,13 @@ abstract class AbstractQueue<E extends Q
return _deleted.get();
}
- public List<E> getMessagesOnTheQueue()
+ public List<QueueEntry> getMessagesOnTheQueue()
{
- ArrayList<E> entryList = new ArrayList<E>();
- QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+ ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+ QueueEntryIterator queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
- E node = queueListIterator.getNode();
+ QueueEntry node = queueListIterator.getNode();
if (node != null && !node.isDeleted())
{
entryList.add(node);
@@ -1263,16 +1388,16 @@ abstract class AbstractQueue<E extends Q
}
- public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState)
+ public void stateChanged(QueueConsumer<?> sub, State oldState, State newState)
{
- if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
+ if (oldState == State.ACTIVE && newState != State.ACTIVE)
{
_activeSubscriberCount.decrementAndGet();
}
- else if (newState == QueueConsumer.State.ACTIVE)
+ else if (newState == State.ACTIVE)
{
- if (oldState != QueueConsumer.State.ACTIVE)
+ if (oldState != State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
@@ -1281,7 +1406,7 @@ abstract class AbstractQueue<E extends Q
}
}
- public int compareTo(final Q o)
+ public int compareTo(final AMQQueue o)
{
return _name.compareTo(o.getName());
}
@@ -1301,7 +1426,7 @@ abstract class AbstractQueue<E extends Q
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber)
+ private void setExclusiveSubscriber(QueueConsumer<?> exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
@@ -1312,32 +1437,32 @@ abstract class AbstractQueue<E extends Q
}
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
- protected L getEntries()
+ protected QueueEntryList getEntries()
{
return _entries;
}
- protected QueueConsumerList<E,Q,L> getConsumerList()
+ protected QueueConsumerList getConsumerList()
{
return _consumerList;
}
- public static interface QueueEntryFilter<E extends QueueEntry>
+ public static interface QueueEntryFilter
{
- public boolean accept(E entry);
+ public boolean accept(QueueEntry entry);
public boolean filterComplete();
}
- public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
+ public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
{
- return getMessagesOnTheQueue(new QueueEntryFilter<E>()
+ return getMessagesOnTheQueue(new QueueEntryFilter()
{
- public boolean accept(E entry)
+ public boolean accept(QueueEntry entry)
{
final long messageId = entry.getMessage().getMessageNumber();
return messageId >= fromMessageId && messageId <= toMessageId;
@@ -1350,13 +1475,13 @@ abstract class AbstractQueue<E extends Q
});
}
- public E getMessageOnTheQueue(final long messageId)
+ public QueueEntry getMessageOnTheQueue(final long messageId)
{
- List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
private boolean _complete;
- public boolean accept(E entry)
+ public boolean accept(QueueEntry entry)
{
_complete = entry.getMessage().getMessageNumber() == messageId;
return _complete;
@@ -1370,13 +1495,13 @@ abstract class AbstractQueue<E extends Q
return entries.isEmpty() ? null : entries.get(0);
}
- public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter)
+ public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
{
- ArrayList<E> entryList = new ArrayList<E>();
- QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+ ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+ QueueEntryIterator queueListIterator = _entries.iterator();
while (queueListIterator.advance() && !filter.filterComplete())
{
- E node = queueListIterator.getNode();
+ QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
@@ -1386,13 +1511,13 @@ abstract class AbstractQueue<E extends Q
}
- public void visit(final QueueEntryVisitor<E> visitor)
+ public void visit(final QueueEntryVisitor visitor)
{
- QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+ QueueEntryIterator queueListIterator = _entries.iterator();
while(queueListIterator.advance())
{
- E node = queueListIterator.getNode();
+ QueueEntry node = queueListIterator.getNode();
if(!node.isDeleted())
{
@@ -1413,13 +1538,13 @@ abstract class AbstractQueue<E extends Q
* @param toPosition last message position
* @return list of messages
*/
- public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
+ public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
{
- return getMessagesOnTheQueue(new QueueEntryFilter<E>()
+ return getMessagesOnTheQueue(new QueueEntryFilter()
{
private long position = 0;
- public boolean accept(E entry)
+ public boolean accept(QueueEntry entry)
{
position++;
return (position >= fromPosition) && (position <= toPosition);
@@ -1450,14 +1575,14 @@ abstract class AbstractQueue<E extends Q
//Perform ACLs
getVirtualHost().getSecurityManager().authorisePurge(this);
- QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+ QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
while (queueListIterator.advance())
{
- E node = queueListIterator.getNode();
+ QueueEntry node = queueListIterator.getNode();
if (node.acquire())
{
dequeueEntry(node, txn);
@@ -1474,13 +1599,13 @@ abstract class AbstractQueue<E extends Q
return count;
}
- private void dequeueEntry(final E node)
+ private void dequeueEntry(final QueueEntry node)
{
ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
dequeueEntry(node, txn);
}
- private void dequeueEntry(final E node, ServerTransaction txn)
+ private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
{
txn.dequeue(this, node.getMessage(),
new ServerTransaction.Action()
@@ -1499,13 +1624,13 @@ abstract class AbstractQueue<E extends Q
}
@Override
- public void addDeleteTask(final Action<? super Q> task)
+ public void addDeleteTask(final Action<? super AMQQueue> task)
{
_deleteTaskList.add(task);
}
@Override
- public void removeDeleteTask(final Action<? super Q> task)
+ public void removeDeleteTask(final Action<? super AMQQueue> task)
{
_deleteTaskList.remove(task);
}
@@ -1519,9 +1644,9 @@ abstract class AbstractQueue<E extends Q
if (!_deleted.getAndSet(true))
{
- final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+ final ArrayList<BindingImpl> bindingCopy = new ArrayList<BindingImpl>(_bindings);
- for (Binding b : bindingCopy)
+ for (BindingImpl b : bindingCopy)
{
b.delete();
}
@@ -1538,10 +1663,10 @@ abstract class AbstractQueue<E extends Q
}
- List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
- public boolean accept(E entry)
+ public boolean accept(QueueEntry entry)
{
return entry.acquire();
}
@@ -1555,7 +1680,7 @@ abstract class AbstractQueue<E extends Q
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- for(final E entry : entries)
+ for(final QueueEntry entry : entries)
{
// TODO log requeues with a post enqueue action
int requeues = entry.routeToAlternate(null, txn);
@@ -1574,9 +1699,9 @@ abstract class AbstractQueue<E extends Q
}
- for (Action<? super Q> task : _deleteTaskList)
+ for (Action<? super AMQQueue> task : _deleteTaskList)
{
- task.performAction((Q)this);
+ task.performAction(this);
}
_deleteTaskList.clear();
@@ -1586,7 +1711,7 @@ abstract class AbstractQueue<E extends Q
CurrentActor.get().message(_logSubject, QueueMessages.DELETED());
}
- return getMessageCount();
+ return getQueueDepthMessages();
}
@@ -1660,7 +1785,7 @@ abstract class AbstractQueue<E extends Q
}
- public void deliverAsync(QueueConsumer<?,E,Q,L> sub)
+ public void deliverAsync(QueueConsumer<?> sub)
{
if(_exclusiveSubscriber == null)
{
@@ -1674,13 +1799,13 @@ abstract class AbstractQueue<E extends Q
}
- void flushConsumer(QueueConsumer<?,E,Q,L> sub)
+ void flushConsumer(QueueConsumer<?> sub)
{
flushConsumer(sub, Long.MAX_VALUE);
}
- boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations)
+ boolean flushConsumer(QueueConsumer<?> sub, long iterations)
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= AbstractQueue.MAX_ASYNC_DELIVERIES;
@@ -1757,7 +1882,7 @@ abstract class AbstractQueue<E extends Q
* @param batch true if processing can be batched
* @return true if we have completed all possible deliveries for this sub.
*/
- private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch)
+ private boolean attemptDelivery(QueueConsumer<?> sub, boolean batch)
{
boolean atTail = false;
@@ -1765,7 +1890,7 @@ abstract class AbstractQueue<E extends Q
if (subActive)
{
- E node = getNextAvailableEntry(sub);
+ QueueEntry node = getNextAvailableEntry(sub);
if (node != null && node.isAvailable())
{
@@ -1802,11 +1927,11 @@ abstract class AbstractQueue<E extends Q
protected void advanceAllConsumers()
{
- QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
while (consumerNodeIterator.advance())
{
- QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode();
- QueueConsumer<?,E,Q,L> sub = subNode.getConsumer();
+ QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
+ QueueConsumer sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
@@ -1818,15 +1943,15 @@ abstract class AbstractQueue<E extends Q
}
}
- private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
+ private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
{
- QueueContext<E,Q,L> context = sub.getQueueContext();
+ QueueContext context = sub.getQueueContext();
if(context != null)
{
- E lastSeen = context.getLastSeenEntry();
- E releasedNode = context.getReleasedEntry();
+ QueueEntry lastSeen = context.getLastSeenEntry();
+ QueueEntry releasedNode = context.getReleasedEntry();
- E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+ QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
@@ -1858,12 +1983,12 @@ abstract class AbstractQueue<E extends Q
}
}
- public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub)
+ public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?> sub)
{
- QueueContext<E,Q,L> context = sub.getQueueContext();
+ QueueContext context = sub.getQueueContext();
if(context != null)
{
- E releasedNode = context.getReleasedEntry();
+ QueueEntry releasedNode = context.getReleasedEntry();
return releasedNode != null && releasedNode.compareTo(entry) < 0;
}
else
@@ -1934,11 +2059,11 @@ abstract class AbstractQueue<E extends Q
boolean allConsumersDone = true;
boolean consumerDone;
- QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
while (consumerNodeIterator.advance())
{
- QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer();
+ QueueConsumer<?> sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
@@ -2020,11 +2145,11 @@ abstract class AbstractQueue<E extends Q
public void checkMessageStatus()
{
- QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+ QueueEntryIterator queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
- E node = queueListIterator.getNode();
+ QueueEntry node = queueListIterator.getNode();
// Only process nodes that are not currently deleted and not dequeued
if (!node.isDeleted())
{
@@ -2054,7 +2179,7 @@ abstract class AbstractQueue<E extends Q
}
- public long getMinimumAlertRepeatGap()
+ public long getAlertRepeatGap()
{
return _minimumAlertRepeatGap;
}
@@ -2064,7 +2189,7 @@ abstract class AbstractQueue<E extends Q
_minimumAlertRepeatGap = minimumAlertRepeatGap;
}
- public long getMaximumMessageAge()
+ public long getAlertThresholdMessageAge()
{
return _maximumMessageAge;
}
@@ -2082,7 +2207,7 @@ abstract class AbstractQueue<E extends Q
}
}
- public long getMaximumMessageCount()
+ public long getAlertThresholdQueueDepthMessages()
{
return _maximumMessageCount;
}
@@ -2101,7 +2226,7 @@ abstract class AbstractQueue<E extends Q
}
- public long getMaximumQueueDepth()
+ public long getAlertThresholdQueueDepthBytes()
{
return _maximumQueueDepth;
}
@@ -2121,7 +2246,7 @@ abstract class AbstractQueue<E extends Q
}
- public long getMaximumMessageSize()
+ public long getAlertThresholdMessageSize()
{
return _maximumMessageSize;
}
@@ -2139,7 +2264,7 @@ abstract class AbstractQueue<E extends Q
}
}
- public long getCapacity()
+ public long getQueueFlowControlSizeBytes()
{
return _capacity;
}
@@ -2149,7 +2274,7 @@ abstract class AbstractQueue<E extends Q
_capacity = capacity;
}
- public long getFlowResumeCapacity()
+ public long getQueueFlowResumeSizeBytes()
{
return _flowResumeCapacity;
}
@@ -2191,12 +2316,12 @@ abstract class AbstractQueue<E extends Q
}
}
- private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
+ private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State>
{
- private final QueueConsumer<?,E,Q,L> _sub;
+ private final QueueConsumer<?> _sub;
- public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub)
+ public QueueEntryListener(final QueueConsumer<?> sub)
{
_sub = sub;
}
@@ -2212,7 +2337,7 @@ abstract class AbstractQueue<E extends Q
return System.identityHashCode(_sub);
}
- public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
deliverAsync(_sub);
@@ -2241,32 +2366,32 @@ abstract class AbstractQueue<E extends Q
return ids;
}
- public long getTotalEnqueueSize()
+ public long getTotalEnqueuedBytes()
{
return _enqueueSize.get();
}
- public long getTotalDequeueSize()
+ public long getTotalDequeuedBytes()
{
return _dequeueSize.get();
}
- public long getPersistentByteEnqueues()
+ public long getPersistentEnqueuedBytes()
{
return _persistentMessageEnqueueSize.get();
}
- public long getPersistentByteDequeues()
+ public long getPersistentDequeuedBytes()
{
return _persistentMessageDequeueSize.get();
}
- public long getPersistentMsgEnqueues()
+ public long getPersistentEnqueuedMessages()
{
return _persistentMessageEnqueueCount.get();
}
- public long getPersistentMsgDequeues()
+ public long getPersistentDequeuedMessages()
{
return _persistentMessageDequeueCount.get();
}
@@ -2278,23 +2403,23 @@ abstract class AbstractQueue<E extends Q
return getName();
}
- public long getUnackedMessageCount()
+ public long getUnacknowledgedMessages()
{
return _unackedMsgCount.get();
}
- public long getUnackedMessageBytes()
+ public long getUnacknowledgedBytes()
{
return _unackedMsgBytes.get();
}
- public void decrementUnackedMsgCount(E queueEntry)
+ public void decrementUnackedMsgCount(QueueEntry queueEntry)
{
_unackedMsgCount.decrementAndGet();
_unackedMsgBytes.addAndGet(-queueEntry.getSize());
}
- private void incrementUnackedMsgCount(E entry)
+ private void incrementUnackedMsgCount(QueueEntry entry)
{
_unackedMsgCount.incrementAndGet();
_unackedMsgBytes.addAndGet(entry.getSize());
@@ -2305,7 +2430,8 @@ abstract class AbstractQueue<E extends Q
return _logActor;
}
- public int getMaximumDeliveryCount()
+ @Override
+ public int getMaximumDeliveryAttempts()
{
return _maximumDeliveryCount;
}
@@ -2321,12 +2447,15 @@ abstract class AbstractQueue<E extends Q
private void checkForNotification(ServerMessage<?> msg)
{
final Set<NotificationCheck> notificationChecks = getNotificationChecks();
- final AMQQueue.NotificationListener listener = _notificationListener;
-
+ QueueNotificationListener listener = _notificationListener;
+ if(listener == null)
+ {
+ listener = NULL_NOTIFICATION_LISTENER;
+ }
if(listener != null && !notificationChecks.isEmpty())
{
final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - getMinimumAlertRepeatGap();
+ final long thresholdTime = currentTime - getAlertRepeatGap();
for (NotificationCheck check : notificationChecks)
{
@@ -2341,7 +2470,7 @@ abstract class AbstractQueue<E extends Q
}
}
- public void setNotificationListener(AMQQueue.NotificationListener listener)
+ public void setNotificationListener(QueueNotificationListener listener)
{
_notificationListener = listener;
}
@@ -2361,7 +2490,7 @@ abstract class AbstractQueue<E extends Q
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+ final Action<? super MessageInstance> postEnqueueAction)
{
txn.enqueue(this,message, new ServerTransaction.Action()
{
@@ -2612,4 +2741,371 @@ abstract class AbstractQueue<E extends Q
_deleteTask = deleteTask;
}
}
+
+ //=============
+
+
+ @Override
+ protected boolean setState(final State currentState, final State desiredState)
+ {
+ if(desiredState == State.DELETED)
+ {
+ _virtualHost.removeQueue(this);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String getQueueType()
+ {
+ return null;
+ }
+
+ @Override
+ public ExclusivityPolicy getExclusive()
+ {
+ return _exclusivityPolicy;
+ }
+
+ @Override
+ public boolean getNoLocal()
+ {
+ return _noLocal;
+ }
+
+ @Override
+ public String getLvqKey()
+ {
+ return null;
+ }
+
+ @Override
+ public String getSortKey()
+ {
+ return null;
+ }
+
+ @Override
+ public String getMessageGroupKey()
+ {
+ return (String) getAttribute(MESSAGE_GROUP_KEY);
+ }
+
+ @Override
+ public int getMessageGroupSharedGroups()
+ {
+ return (Integer) getAttribute(MESSAGE_GROUP_SHARED_GROUPS);
+ }
+
+
+ @Override
+ public boolean isQueueFlowStopped()
+ {
+ return false;
+ }
+
+ @Override
+ public int getPriorities()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesIn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesIn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public String setName(final String currentName, final String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ return null;
+ }
+
+ @Override
+ public State getState()
+ {
+ return isDeleted() ? State.DELETED : State.ACTIVE;
+ }
+
+ @Override
+ public void setDurable(final boolean durable)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
+ {
+ if(clazz == Binding.class)
+ {
+ return (Collection<C>) getBindings();
+ }
+ else if(clazz == org.apache.qpid.server.model.Consumer.class)
+ {
+ return (Collection<C>) getConsumers();
+ }
+ else return Collections.emptySet();
+ }
+
+ @Override
+ public <T extends ConfiguredObject> T getParent(final Class<T> clazz)
+ {
+ if(clazz == org.apache.qpid.server.model.VirtualHost.class)
+ {
+ return (T) _virtualHost.getModel();
+ }
+ return super.getParent(clazz);
+ }
+
+ @Override
+ protected <C extends ConfiguredObject> C addChild(final Class<C> childClass,
+ final Map<String, Object> attributes,
+ final ConfiguredObject... otherParents)
+ {
+ if(childClass == Binding.class && otherParents.length == 1 && otherParents[0] instanceof Exchange)
+ {
+ final String bindingKey = (String) attributes.get("name");
+ ((NonDefaultExchange)otherParents[0]).addBinding(bindingKey, this, attributes);
+ for(Binding binding : _bindings)
+ {
+ if(binding.getExchange() == otherParents[0] && binding.getName().equals(bindingKey))
+ {
+ return (C) binding;
+ }
+ }
+ return null;
+ }
+ return super.addChild(childClass, attributes, otherParents);
+ }
+
+ @Override
+ public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ try
+ {
+ if(ALERT_REPEAT_GAP.equals(name))
+ {
+ setMinimumAlertRepeatGap((Long) desired);
+ return true;
+ }
+ else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
+ {
+ setMaximumMessageAge((Long) desired);
+ return true;
+ }
+ else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
+ {
+ setMaximumMessageSize((Long) desired);
+ return true;
+ }
+ else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
+ {
+ setMaximumQueueDepth((Long) desired);
+ return true;
+ }
+ else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
+ {
+ setMaximumMessageCount((Long) desired);
+ return true;
+ }
+ else if(ALTERNATE_EXCHANGE.equals(name))
+ {
+ // In future we may want to accept a UUID as an alternative way to identifying the exchange
+ NonDefaultExchange alternateExchange = (NonDefaultExchange) desired;
+ setAlternateExchange(alternateExchange);
+ return true;
+ }
+ else if(EXCLUSIVE.equals(name))
+ {
+ ExclusivityPolicy desiredPolicy;
+ if(desired == null)
+ {
+ desiredPolicy = ExclusivityPolicy.NONE;
+ }
+ else if(desired instanceof ExclusivityPolicy)
+ {
+ desiredPolicy = (ExclusivityPolicy)desired;
+ }
+ else if (desired instanceof String)
+ {
+ desiredPolicy = ExclusivityPolicy.valueOf((String)desired);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot set " + Queue.EXCLUSIVE + " property to type " + desired.getClass().getName());
+ }
+ try
+ {
+ setExclusivityPolicy(desiredPolicy);
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive)
+ {
+ throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this");
+ }
+ return true;
+
+ }
+ else if(MESSAGE_GROUP_KEY.equals(name))
+ {
+ // TODO
+ }
+ else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
+ {
+ // TODO
+ }
+ else if(LVQ_KEY.equals(name))
+ {
+ // TODO
+ }
+ else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
+ {
+ setMaximumDeliveryCount((Integer) desired);
+ return true;
+ }
+ else if(NO_LOCAL.equals(name))
+ {
+ // TODO
+ }
+ else if(OWNER.equals(name))
+ {
+ // TODO
+ }
+ else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
+ {
+ setCapacity((Long) desired);
+ return true;
+ }
+ else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
+ {
+ setFlowResumeCapacity((Long) desired);
+ return true;
+ }
+ else if(QUEUE_FLOW_STOPPED.equals(name))
+ {
+ // TODO
+ }
+ else if(SORT_KEY.equals(name))
+ {
+ // TODO
+ }
+ else if(QUEUE_TYPE.equals(name))
+ {
+ // TODO
+ }
+ else if (DESCRIPTION.equals(name))
+ {
+ setDescription((String) desired);
+ return true;
+ }
+
+ return super.changeAttribute(name, expected, desired);
+ }
+ finally
+ {
+ if (isDurable())
+ {
+ DurableConfigurationStoreHelper.updateQueue(this.getVirtualHost().getDurableConfigurationStore(),
+ this);
+ }
+ }
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return getAttributeNames(getClass());
+ }
+ @Override
+ protected void authoriseSetAttribute(String name, Object expected, Object desired) throws AccessControlException
+ {
+ _virtualHost.getSecurityManager().authoriseUpdate(this);
+ }
+
+ @Override
+ protected void authoriseSetAttributes(Map<String, Object> attributes) throws AccessControlException
+ {
+ _virtualHost.getSecurityManager().authoriseUpdate(this);
+ }
+
+ @SuppressWarnings("serial")
+ static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
+ put(ALERT_REPEAT_GAP, Long.class);
+ put(ALERT_THRESHOLD_MESSAGE_AGE, Long.class);
+ put(ALERT_THRESHOLD_MESSAGE_SIZE, Long.class);
+ put(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Long.class);
+ put(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Long.class);
+ put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class);
+ put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class);
+ put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
+ put(DESCRIPTION, String.class);
+ }});
+
+ @Override
+ protected void changeAttributes(final Map<String, Object> attributes)
+ {
+ Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
+ validateAttributes(convertedAttributes);
+
+ super.changeAttributes(convertedAttributes);
+ }
+
+ private void validateAttributes(Map<String, Object> convertedAttributes)
+ {
+ Long queueFlowControlSize = (Long) convertedAttributes.get(QUEUE_FLOW_CONTROL_SIZE_BYTES);
+ Long queueFlowControlResumeSize = (Long) convertedAttributes.get(QUEUE_FLOW_RESUME_SIZE_BYTES);
+ if (queueFlowControlSize != null || queueFlowControlResumeSize != null )
+ {
+ if (queueFlowControlSize == null)
+ {
+ queueFlowControlSize = (Long)getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES);
+ }
+ if (queueFlowControlResumeSize == null)
+ {
+ queueFlowControlResumeSize = (Long)getAttribute(QUEUE_FLOW_RESUME_SIZE_BYTES);
+ }
+ if (queueFlowControlResumeSize > queueFlowControlSize)
+ {
+ throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
+ }
+ }
+ for (Map.Entry<String, Object> entry: convertedAttributes.entrySet())
+ {
+ Object value = entry.getValue();
+ if (value instanceof Number && ((Number)value).longValue() < 0)
+ {
+ throw new IllegalConfigurationException("Only positive integer value can be specified for the attribute "
+ + entry.getKey());
+ }
+ }
+ }
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java Wed Feb 26 23:27:39 2014
@@ -28,13 +28,13 @@ import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
-public class AssignedConsumerMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> implements MessageGroupManager<E,Q,L>
+public class AssignedConsumerMessageGroupManager implements MessageGroupManager
{
private static final Logger _logger = LoggerFactory.getLogger(AssignedConsumerMessageGroupManager.class);
private final String _groupId;
- private final ConcurrentHashMap<Integer, QueueConsumer> _groupMap = new ConcurrentHashMap<Integer, QueueConsumer>();
+ private final ConcurrentHashMap<Integer, QueueConsumer<?>> _groupMap = new ConcurrentHashMap<Integer, QueueConsumer<?>>();
private final int _groupMask;
public AssignedConsumerMessageGroupManager(final String groupId, final int maxGroups)
@@ -53,18 +53,18 @@ public class AssignedConsumerMessageGrou
return val;
}
- public QueueConsumer getAssignedConsumer(final E entry)
+ public QueueConsumer<?> getAssignedConsumer(final QueueEntry entry)
{
Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
}
- public boolean acceptMessage(QueueConsumer<?,E,Q,L> sub, E entry)
+ public boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry)
{
return assignMessage(sub, entry) && entry.acquire(sub);
}
- private boolean assignMessage(QueueConsumer sub, E entry)
+ private boolean assignMessage(QueueConsumer<?> sub, QueueEntry entry)
{
Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
if(groupVal == null)
@@ -98,24 +98,24 @@ public class AssignedConsumerMessageGrou
}
}
- public E findEarliestAssignedAvailableEntry(QueueConsumer sub)
+ public QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer<?> sub)
{
EntryFinder visitor = new EntryFinder(sub);
sub.getQueue().visit(visitor);
return visitor.getEntry();
}
- private class EntryFinder implements QueueEntryVisitor<E>
+ private class EntryFinder implements QueueEntryVisitor
{
- private E _entry;
- private QueueConsumer _sub;
+ private QueueEntry _entry;
+ private QueueConsumer<?> _sub;
- public EntryFinder(final QueueConsumer sub)
+ public EntryFinder(final QueueConsumer<?> sub)
{
_sub = sub;
}
- public boolean visit(final E entry)
+ public boolean visit(final QueueEntry entry)
{
if(!entry.isAvailable())
{
@@ -129,7 +129,7 @@ public class AssignedConsumerMessageGrou
}
Integer group = groupId.hashCode() & _groupMask;
- Consumer assignedSub = _groupMap.get(group);
+ QueueConsumer<?> assignedSub = _groupMap.get(group);
if(assignedSub == _sub)
{
_entry = entry;
@@ -141,15 +141,15 @@ public class AssignedConsumerMessageGrou
}
}
- public E getEntry()
+ public QueueEntry getEntry()
{
return _entry;
}
}
- public void clearAssignments(QueueConsumer sub)
+ public void clearAssignments(QueueConsumer<?> sub)
{
- Iterator<QueueConsumer> subIter = _groupMap.values().iterator();
+ Iterator<QueueConsumer<?>> subIter = _groupMap.values().iterator();
while(subIter.hasNext())
{
if(subIter.next() == sub)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Wed Feb 26 23:27:39 2014
@@ -27,9 +27,9 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
-public interface BaseQueue<C extends Consumer> extends TransactionLogResource
+public interface BaseQueue extends TransactionLogResource
{
- void enqueue(ServerMessage message, Action<? super MessageInstance<?,C>> action);
+ void enqueue(ServerMessage message, Action<? super MessageInstance> action);
boolean isDurable();
boolean isDeleted();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Wed Feb 26 23:27:39 2014
@@ -28,7 +28,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class ConflationQueue extends AbstractQueue<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
+public class ConflationQueue extends AbstractQueue
{
public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key";
@@ -52,7 +52,7 @@ public class ConflationQueue extends Abs
public String getConflationKey()
{
- return getEntries().getConflationKey();
+ return ((ConflationQueueList)getEntries()).getConflationKey();
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Wed Feb 26 23:27:39 2014
@@ -32,17 +32,17 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
-public class ConflationQueueList extends OrderedQueueEntryList<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
+public class ConflationQueueList extends OrderedQueueEntryList
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
- private static final HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> HEAD_CREATOR = new HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>()
+ private static final HeadCreator HEAD_CREATOR = new HeadCreator()
{
@Override
- public ConflationQueueEntry createHead(final ConflationQueueList list)
+ public ConflationQueueEntry createHead(final QueueEntryList list)
{
- return list.createHead();
+ return ((ConflationQueueList)list).createHead();
}
};
@@ -75,13 +75,15 @@ public class ConflationQueueList extends
return new ConflationQueueEntry(this, message);
}
+
+
/**
* Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
*/
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- final ConflationQueueEntry addedEntry = super.add(message);
+ final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message);
final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
if (keyValue != null)
@@ -192,7 +194,7 @@ public class ConflationQueueList extends
}
}
- final class ConflationQueueEntry extends OrderedQueueEntry<ConflationQueueEntry, ConflationQueue, ConflationQueueList>
+ final class ConflationQueueEntry extends OrderedQueueEntry
{
private AtomicReference<ConflationQueueEntry> _latestValueReference;
@@ -252,7 +254,7 @@ public class ConflationQueueList extends
return Collections.unmodifiableMap(_latestValuesMap);
}
- static class Factory implements QueueEntryListFactory<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
+ static class Factory implements QueueEntryListFactory
{
private final String _conflationKey;
@@ -262,9 +264,9 @@ public class ConflationQueueList extends
}
@Override
- public ConflationQueueList createQueueEntryList(final ConflationQueue queue)
+ public ConflationQueueList createQueueEntryList(final AMQQueue<?> queue)
{
- return new ConflationQueueList(queue, _conflationKey);
+ return new ConflationQueueList((ConflationQueue)queue, _conflationKey);
}
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Wed Feb 26 23:27:39 2014
@@ -32,22 +32,22 @@ import org.apache.qpid.server.message.Se
import java.util.HashMap;
import java.util.Map;
-public class DefinedGroupMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> implements MessageGroupManager<E,Q,L>
+public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
private final String _groupId;
private final String _defaultGroup;
private final Map<Object, Group> _groupMap = new HashMap<Object, Group>();
- private final ConsumerResetHelper<E,Q,L> _resetHelper;
+ private final ConsumerResetHelper _resetHelper;
private final class Group
{
private final Object _group;
- private QueueConsumer<?,E,Q,L> _consumer;
+ private QueueConsumer<?> _consumer;
private int _activeCount;
- private Group(final Object key, final QueueConsumer<?,E,Q,L> consumer)
+ private Group(final Object key, final QueueConsumer<?> consumer)
{
_group = key;
_consumer = consumer;
@@ -104,7 +104,7 @@ public class DefinedGroupMessageGroupMan
return !(_consumer == null || (_activeCount == 0 && _consumer.isClosed()));
}
- public QueueConsumer<?,E,Q,L> getConsumer()
+ public QueueConsumer<?> getConsumer()
{
return _consumer;
}
@@ -120,14 +120,14 @@ public class DefinedGroupMessageGroupMan
}
}
- public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper<E,Q,L> resetHelper)
+ public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper)
{
_groupId = groupId;
_defaultGroup = defaultGroup;
_resetHelper = resetHelper;
}
- public synchronized QueueConsumer<?,E,Q,L> getAssignedConsumer(final E entry)
+ public synchronized QueueConsumer<?> getAssignedConsumer(final QueueEntry entry)
{
Object groupId = getKey(entry);
@@ -135,12 +135,12 @@ public class DefinedGroupMessageGroupMan
return group == null || !group.isValid() ? null : group.getConsumer();
}
- public synchronized boolean acceptMessage(final QueueConsumer<?,E,Q,L> sub, final E entry)
+ public synchronized boolean acceptMessage(final QueueConsumer<?> sub, final QueueEntry entry)
{
return assignMessage(sub, entry) && entry.acquire(sub);
}
- private boolean assignMessage(final QueueConsumer<?,E,Q,L> sub, final E entry)
+ private boolean assignMessage(final QueueConsumer<?> sub, final QueueEntry entry)
{
Object groupId = getKey(entry);
Group group = _groupMap.get(groupId);
@@ -173,24 +173,24 @@ public class DefinedGroupMessageGroupMan
}
}
- public synchronized E findEarliestAssignedAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
+ public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer<?> sub)
{
EntryFinder visitor = new EntryFinder(sub);
sub.getQueue().visit(visitor);
return visitor.getEntry();
}
- private class EntryFinder implements QueueEntryVisitor<E>
+ private class EntryFinder implements QueueEntryVisitor
{
- private E _entry;
+ private QueueEntry _entry;
private QueueConsumer _sub;
- public EntryFinder(final QueueConsumer sub)
+ public EntryFinder(final QueueConsumer<?> sub)
{
_sub = sub;
}
- public boolean visit(final E entry)
+ public boolean visit(final QueueEntry entry)
{
if(!entry.isAvailable())
{
@@ -211,18 +211,18 @@ public class DefinedGroupMessageGroupMan
}
}
- public E getEntry()
+ public QueueEntry getEntry()
{
return _entry;
}
}
- public void clearAssignments(final QueueConsumer sub)
+ public void clearAssignments(final QueueConsumer<?> sub)
{
}
- private Object getKey(E entry)
+ private Object getKey(QueueEntry entry)
{
ServerMessage message = entry.getMessage();
AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
@@ -234,7 +234,7 @@ public class DefinedGroupMessageGroupMan
return groupVal;
}
- private class GroupStateChangeListener implements StateChangeListener<MessageInstance<?, ? extends QueueConsumer>, QueueEntry.State>
+ private class GroupStateChangeListener implements StateChangeListener<MessageInstance, MessageInstance.State>
{
private final Group _group;
@@ -243,7 +243,7 @@ public class DefinedGroupMessageGroupMan
_group = group;
}
- public void stateChanged(final MessageInstance<?, ? extends QueueConsumer> entry,
+ public void stateChanged(final MessageInstance entry,
final MessageInstance.State oldState,
final MessageInstance.State newState)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java Wed Feb 26 23:27:39 2014
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.server.queue;
-public interface MessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+public interface MessageGroupManager
{
- public interface ConsumerResetHelper<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+ public interface ConsumerResetHelper
{
- public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments);
+ public void resetSubPointersForGroups(QueueConsumer<?> consumer, boolean clearAssignments);
- boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub);
+ boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?> sub);
}
- QueueConsumer getAssignedConsumer(E entry);
+ QueueConsumer getAssignedConsumer(QueueEntry entry);
- boolean acceptMessage(QueueConsumer<?,E,Q,L> sub, E entry);
+ boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry);
- E findEarliestAssignedAvailableEntry(QueueConsumer<?,E,Q,L> sub);
+ QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer<?> sub);
- void clearAssignments(QueueConsumer sub);
+ void clearAssignments(QueueConsumer<?> sub);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org