You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC
svn commit: r1567616 [5/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/
qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/
qpid/cpp/bindings/qpid/ruby/ qp...
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Feb 12 13:27:51 2014
@@ -26,19 +26,20 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.protocol.CapacityChecker;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
+public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer>
+ extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination
{
- String getName();
public interface NotificationListener
{
@@ -65,45 +66,20 @@ public interface AMQQueue extends Compar
long getTotalEnqueueCount();
- public interface Context
- {
- QueueEntry getLastSeenEntry();
- }
-
void setNoLocal(boolean b);
boolean isAutoDelete();
String getOwner();
- AuthorizationHolder getAuthorizationHolder();
- void setAuthorizationHolder(AuthorizationHolder principalHolder);
-
- void setExclusiveOwningSession(AMQSessionModel owner);
- AMQSessionModel getExclusiveOwningSession();
VirtualHost getVirtualHost();
- void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
-
- void unregisterSubscription(final Subscription subscription) throws AMQException;
-
- Collection<Subscription> getConsumers();
-
- interface SubscriptionRegistrationListener
- {
- void subscriptionRegistered(AMQQueue queue, Subscription subscription);
- void subscriptionUnregistered(AMQQueue queue, Subscription subscription);
- }
-
- void addSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
- void removeSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
-
int getConsumerCount();
int getActiveConsumerCount();
- boolean hasExclusiveSubscriber();
+ boolean hasExclusiveConsumer();
boolean isUnused();
@@ -111,41 +87,35 @@ public interface AMQQueue extends Compar
int getMessageCount();
- int getUndeliveredMessageCount();
-
long getQueueDepth();
- long getReceivedMessageCount();
-
long getOldestMessageArrivalTime();
boolean isDeleted();
int delete() throws AMQException;
- void requeue(QueueEntry entry);
+ void requeue(E entry);
- void dequeue(QueueEntry entry, Subscription sub);
+ void dequeue(E entry);
- void decrementUnackedMsgCount(QueueEntry queueEntry);
+ void decrementUnackedMsgCount(E queueEntry);
- boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+ boolean resend(final E entry, final C consumer) throws AMQException;
- void addQueueDeleteTask(final Task task);
- void removeQueueDeleteTask(final Task task);
+ void addQueueDeleteTask(Action<AMQQueue> task);
+ void removeQueueDeleteTask(Action<AMQQueue> task);
- List<QueueEntry> getMessagesOnTheQueue();
-
- List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
+ List<E> getMessagesOnTheQueue();
List<Long> getMessagesOnTheQueue(int num);
List<Long> getMessagesOnTheQueue(int num, int offset);
- QueueEntry getMessageOnTheQueue(long messageId);
+ E getMessageOnTheQueue(long messageId);
/**
* Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
@@ -156,9 +126,9 @@ public interface AMQQueue extends Compar
* @param toPosition
* @return
*/
- public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
+ public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
- void visit(QueueEntryVisitor visitor);
+ void visit(QueueEntryVisitor<E> visitor);
long getMaximumMessageSize();
@@ -209,16 +179,10 @@ public interface AMQQueue extends Compar
Set<NotificationCheck> getNotificationChecks();
- void flushSubscription(final Subscription sub) throws AMQException;
-
- void deliverAsync(final Subscription sub);
-
void deliverAsync();
void stop();
- boolean isExclusive();
-
Exchange getAlternateExchange();
void setAlternateExchange(Exchange exchange);
@@ -226,56 +190,6 @@ public interface AMQQueue extends Compar
Collection<String> getAvailableAttributes();
Object getAttribute(String attrName);
- void checkCapacity(AMQSessionModel channel);
-
- /**
- * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingExclusiveSubscription extends AMQException
- {
-
- public ExistingExclusiveSubscription()
- {
- super("");
- }
- }
-
- /**
- * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusive subscription, as a subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusive subscription, as a subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingSubscriptionPreventsExclusive extends AMQException
- {
- public ExistingSubscriptionPreventsExclusive()
- {
- super("");
- }
- }
-
- static interface Task
- {
- public void doTask(AMQQueue queue) throws AMQException;
- }
-
void configure(QueueConfiguration config);
void setExclusive(boolean exclusive);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Wed Feb 12 13:27:51 2014
@@ -288,11 +288,11 @@ public class AMQQueueFactory implements
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
+ q = new PriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
}
else
{
- q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
+ q = new StandardQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
}
q.setDeleteOnNoConsumers(deleteOnNoConsumer);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Wed Feb 12 13:27:51 2014
@@ -22,19 +22,15 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.Action;
-public interface BaseQueue extends TransactionLogResource
+public interface BaseQueue<C extends Consumer> extends TransactionLogResource
{
- public static interface PostEnqueueAction
- {
- public void onEnqueue(QueueEntry entry);
- }
-
- void enqueue(ServerMessage message) throws AMQException;
- void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
- void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
+ void enqueue(ServerMessage message, Action<? super MessageInstance<?,C>> action) throws AMQException;
boolean isDurable();
boolean isDeleted();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Wed Feb 12 13:27:51 2014
@@ -26,7 +26,7 @@ import java.util.UUID;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class ConflationQueue extends SimpleAMQQueue
+public class ConflationQueue extends SimpleAMQQueue<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
protected ConflationQueue(UUID id,
String name,
@@ -42,7 +42,7 @@ public class ConflationQueue extends Sim
public String getConflationKey()
{
- return ((ConflationQueueList) getEntries()).getConflationKey();
+ return getEntries().getConflationKey();
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Wed Feb 12 13:27:51 2014
@@ -32,23 +32,38 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
-public class ConflationQueueList extends SimpleQueueEntryList
+public class ConflationQueueList extends OrderedQueueEntryList<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
+ private static final HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> HEAD_CREATOR = new HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>()
+ {
+
+ @Override
+ public ConflationQueueEntry createHead(final ConflationQueueList list)
+ {
+ return list.createHead();
+ }
+ };
+
private final String _conflationKey;
- private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
- new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+ private final ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>> _latestValuesMap =
+ new ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>>();
- private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
- private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
+ private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
+ private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);
- public ConflationQueueList(AMQQueue queue, String conflationKey)
+ public ConflationQueueList(ConflationQueue queue, String conflationKey)
{
- super(queue);
+ super(queue, HEAD_CREATOR);
_conflationKey = conflationKey;
}
+ private ConflationQueueEntry createHead()
+ {
+ return new ConflationQueueEntry(this);
+ }
+
public String getConflationKey()
{
return _conflationKey;
@@ -66,7 +81,7 @@ public class ConflationQueueList extends
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message));
+ final ConflationQueueEntry addedEntry = super.add(message);
final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
if (keyValue != null)
@@ -76,14 +91,14 @@ public class ConflationQueueList extends
LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
}
- final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
- AtomicReference<QueueEntry> entryReferenceFromMap = null;
- QueueEntry entryFromMap;
+ final AtomicReference<ConflationQueueEntry> referenceToEntry = new AtomicReference<ConflationQueueEntry>(addedEntry);
+ AtomicReference<ConflationQueueEntry> entryReferenceFromMap;
+ ConflationQueueEntry entryFromMap;
// Iterate until we have got a valid atomic reference object and either the referent is newer than the current
// entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
// indicating that the reference object is no longer valid (it is being removed from the map).
- boolean keepTryingToUpdateEntryReference = true;
+ boolean keepTryingToUpdateEntryReference;
do
{
do
@@ -139,16 +154,16 @@ public class ConflationQueueList extends
* adds and removes during execution of this method.</li>
* </ul>
*/
- private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue)
+ private AtomicReference<ConflationQueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<ConflationQueueEntry> referenceToAddedValue)
{
- AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
+ AtomicReference<ConflationQueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
if(latestValueReference == null)
{
latestValueReference = _latestValuesMap.get(key);
if(latestValueReference == null)
{
- return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone);
+ return new AtomicReference<ConflationQueueEntry>(_newerEntryAlreadyBeenAndGone);
}
}
return latestValueReference;
@@ -177,12 +192,17 @@ public class ConflationQueueList extends
}
}
- private final class ConflationQueueEntry extends SimpleQueueEntryImpl
+ final class ConflationQueueEntry extends OrderedQueueEntry<ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
- private AtomicReference<QueueEntry> _latestValueReference;
+ private AtomicReference<ConflationQueueEntry> _latestValueReference;
+
+ private ConflationQueueEntry(final ConflationQueueList queueEntryList)
+ {
+ super(queueEntryList);
+ }
- public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ public ConflationQueueEntry(ConflationQueueList queueEntryList, ServerMessage message)
{
super(queueEntryList, message);
}
@@ -206,7 +226,7 @@ public class ConflationQueueList extends
}
- public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
+ public void setLatestValueReference(final AtomicReference<ConflationQueueEntry> latestValueReference)
{
_latestValueReference = latestValueReference;
}
@@ -227,12 +247,12 @@ public class ConflationQueueList extends
/**
* Exposed purposes of unit test only.
*/
- Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap()
+ Map<Object, AtomicReference<ConflationQueueEntry>> getLatestValuesMap()
{
return Collections.unmodifiableMap(_latestValuesMap);
}
- static class Factory implements QueueEntryListFactory
+ static class Factory implements QueueEntryListFactory<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
private final String _conflationKey;
@@ -241,7 +261,8 @@ public class ConflationQueueList extends
_conflationKey = conflationKey;
}
- public ConflationQueueList createQueueEntryList(AMQQueue queue)
+ @Override
+ public ConflationQueueList createQueueEntryList(final ConflationQueue queue)
{
return new ConflationQueueList(queue, _conflationKey);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Wed Feb 12 13:27:51 2014
@@ -20,37 +20,35 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
import java.util.UUID;
-public abstract class OutOfOrderQueue extends SimpleAMQQueue
+public abstract class OutOfOrderQueue<E extends QueueEntryImpl<E,Q,L>, Q extends OutOfOrderQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends SimpleAMQQueue<E,Q,L>
{
protected OutOfOrderQueue(UUID id, String name, boolean durable,
String owner, boolean autoDelete, boolean exclusive,
- VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+ VirtualHost virtualHost, QueueEntryListFactory<E,Q,L> entryListFactory, Map<String, Object> arguments)
{
super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
}
@Override
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final E entry)
{
- // check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator();
+ // check that all consumers are not in advance of the entry
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> subIter = getConsumerList().iterator();
while(subIter.advance() && !entry.isAcquired())
{
- final Subscription subscription = subIter.getNode().getSubscription();
- if(!subscription.isClosed())
+ final QueueConsumer<?,E,Q,L> consumer = subIter.getNode().getConsumer();
+ if(!consumer.isClosed())
{
- QueueContext context = (QueueContext) subscription.getQueueContext();
+ QueueContext<E,Q,L> context = consumer.getQueueContext();
if(context != null)
{
- QueueEntry released = context.getReleasedEntry();
+ E released = context.getReleasedEntry();
while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0))
{
if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Wed Feb 12 13:27:51 2014
@@ -22,132 +22,162 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
-public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
+abstract public class PriorityQueueList extends OrderedQueueEntryList<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
{
- private final AMQQueue _queue;
- private final PriorityQueueEntrySubList[] _priorityLists;
- private final int _priorities;
- private final int _priorityOffset;
- public PriorityQueueList(AMQQueue queue, int priorities)
- {
- _queue = queue;
- _priorityLists = new PriorityQueueEntrySubList[priorities];
- _priorities = priorities;
- _priorityOffset = 5-((priorities + 1)/2);
- for(int i = 0; i < priorities; i++)
- {
- _priorityLists[i] = new PriorityQueueEntrySubList(queue, i);
- }
- }
- public int getPriorities()
+ public PriorityQueueList(final PriorityQueue queue,
+ final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> headCreator)
{
- return _priorities;
+ super(queue, headCreator);
}
- public AMQQueue getQueue()
+ static class PriorityQueueMasterList extends PriorityQueueList
{
- return _queue;
- }
+ private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> DUMMY_HEAD_CREATOR =
+ new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+ {
+ @Override
+ public PriorityQueueEntry createHead(final PriorityQueueList list)
+ {
+ return null;
+ }
+ };
+ private final PriorityQueue _queue;
+ private final PriorityQueueEntrySubList[] _priorityLists;
+ private final int _priorities;
+ private final int _priorityOffset;
- public SimpleQueueEntryImpl add(ServerMessage message)
- {
- int index = message.getMessageHeader().getPriority() - _priorityOffset;
- if(index >= _priorities)
+ public PriorityQueueMasterList(PriorityQueue queue, int priorities)
{
- index = _priorities-1;
+ super(queue, DUMMY_HEAD_CREATOR);
+ _queue = queue;
+ _priorityLists = new PriorityQueueEntrySubList[priorities];
+ _priorities = priorities;
+ _priorityOffset = 5-((priorities + 1)/2);
+ for(int i = 0; i < priorities; i++)
+ {
+ _priorityLists[i] = new PriorityQueueEntrySubList(queue, i);
+ }
}
- else if(index < 0)
+
+ public int getPriorities()
{
- index = 0;
+ return _priorities;
}
- return _priorityLists[index].add(message);
-
- }
-
- public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node)
- {
- SimpleQueueEntryImpl next = node.getNextValidEntry();
- if(next == null)
+ public PriorityQueue getQueue()
{
- final QueueEntryList<?> nodeEntryList = node.getQueueEntryList();
- int index;
- for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--) {};
+ return _queue;
+ }
- while(next == null && index != 0)
+ public PriorityQueueEntry add(ServerMessage message)
+ {
+ int index = message.getMessageHeader().getPriority() - _priorityOffset;
+ if(index >= _priorities)
+ {
+ index = _priorities-1;
+ }
+ else if(index < 0)
{
- index--;
- next = _priorityLists[index].getHead().getNextValidEntry();
+ index = 0;
}
+ return _priorityLists[index].add(message);
}
- return next;
- }
- private final class PriorityQueueEntryListIterator implements QueueEntryIterator<SimpleQueueEntryImpl>
- {
- private final SimpleQueueEntryList.QueueEntryIteratorImpl[] _iterators = new SimpleQueueEntryList.QueueEntryIteratorImpl[ _priorityLists.length ];
- private SimpleQueueEntryImpl _lastNode;
+ @Override
+ protected PriorityQueueEntry createQueueEntry(final ServerMessage<?> message)
+ {
+ throw new UnsupportedOperationException();
+ }
- PriorityQueueEntryListIterator()
+ public PriorityQueueEntry next(PriorityQueueEntry node)
{
- for(int i = 0; i < _priorityLists.length; i++)
+ PriorityQueueEntry next = node.getNextValidEntry();
+
+ if(next == null)
{
- _iterators[i] = _priorityLists[i].iterator();
+ final PriorityQueueList nodeEntryList = node.getQueueEntryList();
+ int index;
+ for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--)
+ {
+ // do nothing loop is just to find the index
+ }
+
+ while(next == null && index != 0)
+ {
+ index--;
+ next = _priorityLists[index].getHead().getNextValidEntry();
+ }
+
}
- _lastNode = _iterators[_iterators.length - 1].getNode();
+ return next;
}
-
- public boolean atTail()
+ private final class PriorityQueueEntryListIterator implements QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList, QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>
{
- for(int i = 0; i < _iterators.length; i++)
+ private final QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList,QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+ private PriorityQueueEntry _lastNode;
+
+ PriorityQueueEntryListIterator()
{
- if(!_iterators[i].atTail())
+ for(int i = 0; i < _priorityLists.length; i++)
{
- return false;
+ _iterators[i] = _priorityLists[i].iterator();
}
+ _lastNode = _iterators[_iterators.length - 1].getNode();
}
- return true;
- }
- public SimpleQueueEntryImpl getNode()
- {
- return _lastNode;
- }
- public boolean advance()
- {
- for(int i = _iterators.length-1; i >= 0; i--)
+ public boolean atTail()
{
- if(_iterators[i].advance())
+ for(int i = 0; i < _iterators.length; i++)
{
- _lastNode = _iterators[i].getNode();
- return true;
+ if(!_iterators[i].atTail())
+ {
+ return false;
+ }
}
+ return true;
+ }
+
+ public PriorityQueueEntry getNode()
+ {
+ return _lastNode;
+ }
+
+ public boolean advance()
+ {
+ for(int i = _iterators.length-1; i >= 0; i--)
+ {
+ if(_iterators[i].advance())
+ {
+ _lastNode = _iterators[i].getNode();
+ return true;
+ }
+ }
+ return false;
}
- return false;
}
- }
- public PriorityQueueEntryListIterator iterator()
- {
- return new PriorityQueueEntryListIterator();
- }
+ public PriorityQueueEntryListIterator iterator()
+ {
- public SimpleQueueEntryImpl getHead()
- {
- return _priorityLists[_priorities-1].getHead();
- }
+ return new PriorityQueueEntryListIterator();
+ }
- public void entryDeleted(final SimpleQueueEntryImpl queueEntry)
- {
+ public PriorityQueueEntry getHead()
+ {
+ return _priorityLists[_priorities-1].getHead();
+ }
- }
+ public void entryDeleted(final PriorityQueueEntry queueEntry)
+ {
- static class Factory implements QueueEntryListFactory
+ }
+ }
+ static class Factory implements QueueEntryListFactory<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
{
private final int _priorities;
@@ -156,26 +186,34 @@ public class PriorityQueueList implement
_priorities = priorities;
}
- public PriorityQueueList createQueueEntryList(AMQQueue queue)
+ public PriorityQueueList createQueueEntryList(PriorityQueue queue)
{
- return new PriorityQueueList(queue, _priorities);
+ return new PriorityQueueMasterList(queue, _priorities);
}
}
- private static class PriorityQueueEntrySubList extends SimpleQueueEntryList
+ static class PriorityQueueEntrySubList extends PriorityQueueList
{
+ private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> HEAD_CREATOR = new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+ {
+ @Override
+ public PriorityQueueEntry createHead(final PriorityQueueList list)
+ {
+ return new PriorityQueueEntry(list);
+ }
+ };
private int _listPriority;
- public PriorityQueueEntrySubList(AMQQueue queue, int listPriority)
+ public PriorityQueueEntrySubList(PriorityQueue queue, int listPriority)
{
- super(queue);
+ super(queue, HEAD_CREATOR);
_listPriority = listPriority;
}
@Override
- protected PriorityQueueEntryImpl createQueueEntry(ServerMessage<?> message)
+ protected PriorityQueueEntry createQueueEntry(ServerMessage<?> message)
{
- return new PriorityQueueEntryImpl(this, message);
+ return new PriorityQueueEntry(this, message);
}
public int getListPriority()
@@ -184,17 +222,22 @@ public class PriorityQueueList implement
}
}
- private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl
+ static class PriorityQueueEntry extends OrderedQueueEntry<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
{
- public PriorityQueueEntryImpl(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
+ private PriorityQueueEntry(final PriorityQueueList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public PriorityQueueEntry(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
{
super(queueEntryList, message);
}
@Override
- public int compareTo(final QueueEntry o)
+ public int compareTo(final PriorityQueueEntry o)
{
- PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)((PriorityQueueEntryImpl)o).getQueueEntryList();
+ PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)o.getQueueEntryList();
int otherPriority = pqel.getListPriority();
int thisPriority = ((PriorityQueueEntrySubList) getQueueEntryList()).getListPriority();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java Wed Feb 12 13:27:51 2014
@@ -23,32 +23,32 @@ package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-final class QueueContext implements AMQQueue.Context
+final class QueueContext<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- private volatile QueueEntry _lastSeenEntry;
- private volatile QueueEntry _releasedEntry;
+ private volatile E _lastSeenEntry;
+ private volatile E _releasedEntry;
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
_lastSeenUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ (QueueContext.class, QueueEntryImpl.class, "_lastSeenEntry");
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
_releasedUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_releasedEntry");
+ (QueueContext.class, QueueEntryImpl.class, "_releasedEntry");
- public QueueContext(QueueEntry head)
+ public QueueContext(E head)
{
_lastSeenEntry = head;
}
- public QueueEntry getLastSeenEntry()
+ public E getLastSeenEntry()
{
return _lastSeenEntry;
}
- QueueEntry getReleasedEntry()
+ E getReleasedEntry()
{
return _releasedEntry;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Wed Feb 12 13:27:51 2014
@@ -20,207 +20,21 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
-public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
+public interface QueueEntry<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> extends MessageInstance<E,C>, Comparable<E>
{
-
-
- public static enum State
- {
- AVAILABLE,
- ACQUIRED,
- EXPIRED,
- DEQUEUED,
- DELETED;
-
-
- }
-
- public static interface StateChangeListener
- {
- public void stateChanged(QueueEntry entry, State oldSate, State newState);
- }
-
- public abstract class EntryState
- {
- private EntryState()
- {
- }
-
- public abstract State getState();
-
- /**
- * Returns true if state is either DEQUEUED or DELETED.
- *
- * @return true if state is either DEQUEUED or DELETED.
- */
- public boolean isDispensed()
- {
- State currentState = getState();
- return currentState == State.DEQUEUED || currentState == State.DELETED;
- }
- }
-
-
- public final class AvailableState extends EntryState
- {
-
- public State getState()
- {
- return State.AVAILABLE;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class DequeuedState extends EntryState
- {
-
- public State getState()
- {
- return State.DEQUEUED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class DeletedState extends EntryState
- {
-
- public State getState()
- {
- return State.DELETED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
- public final class ExpiredState extends EntryState
- {
-
- public State getState()
- {
- return State.EXPIRED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class NonSubscriptionAcquiredState extends EntryState
- {
- public State getState()
- {
- return State.ACQUIRED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
- public final class SubscriptionAcquiredState extends EntryState
- {
- private final Subscription _subscription;
-
- public SubscriptionAcquiredState(Subscription subscription)
- {
- _subscription = subscription;
- }
-
-
- public State getState()
- {
- return State.ACQUIRED;
- }
-
- public Subscription getSubscription()
- {
- return _subscription;
- }
-
- public String toString()
- {
- return "{" + getState().name() + " : " + _subscription +"}";
- }
- }
-
-
- final static EntryState AVAILABLE_STATE = new AvailableState();
- final static EntryState DELETED_STATE = new DeletedState();
- final static EntryState DEQUEUED_STATE = new DequeuedState();
- final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
-
-
-
- AMQQueue getQueue();
+ Q getQueue();
long getSize();
- boolean getDeliveredToConsumer();
-
- boolean expired() throws AMQException;
-
- boolean acquire(Subscription sub);
-
- boolean acquiredBySubscription();
- boolean isAcquiredBy(Subscription subscription);
-
- void setRedelivered();
-
- boolean isRedelivered();
-
- Subscription getDeliveredSubscription();
-
- void reject();
-
- boolean isRejectedBy(long subscriptionId);
-
- int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
-
boolean isQueueDeleted();
- QueueEntry getNextNode();
-
- QueueEntry getNextValidEntry();
-
- void addStateChangeListener(StateChangeListener listener);
- boolean removeStateChangeListener(StateChangeListener listener);
-
-
- /**
- * Number of times this queue entry has been delivered.
- *
- * @return delivery count
- */
- int getDeliveryCount();
-
- void incrementDeliveryCount();
+ E getNextNode();
- void decrementDeliveryCount();
+ E getNextValidEntry();
- Filterable asFilterable();
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Feb 12 13:27:51 2014
@@ -26,11 +26,15 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import java.util.EnumMap;
import java.util.HashSet;
@@ -40,13 +44,13 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-public abstract class QueueEntryImpl implements QueueEntry
+public abstract class QueueEntryImpl<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements QueueEntry<E,Q,QueueConsumer<?,E,Q,L>>
{
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
- private final QueueEntryList _queueEntryList;
+ private final L _queueEntryList;
- private MessageReference _message;
+ private final MessageReference _message;
private Set<Long> _rejectedBy = null;
@@ -59,7 +63,7 @@ public abstract class QueueEntryImpl imp
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener> _stateChangeListeners;
+ private volatile Set<StateChangeListener<? super E, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -86,14 +90,14 @@ public abstract class QueueEntryImpl imp
private boolean _deliveredToConsumer;
- public QueueEntryImpl(QueueEntryList<?> queueEntryList)
+ public QueueEntryImpl(L queueEntryList)
{
this(queueEntryList,null,Long.MIN_VALUE);
_state = DELETED_STATE;
}
- public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId)
+ public QueueEntryImpl(L queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
@@ -103,7 +107,7 @@ public abstract class QueueEntryImpl imp
populateInstanceProperties();
}
- public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
+ public QueueEntryImpl(L queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
_message = message == null ? null : message.newReference();
@@ -134,7 +138,7 @@ public abstract class QueueEntryImpl imp
return _entryId;
}
- public AMQQueue getQueue()
+ public Q getQueue()
{
return _queueEntryList.getQueue();
}
@@ -183,7 +187,7 @@ public abstract class QueueEntryImpl imp
public boolean acquire()
{
- return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
+ return acquire(NON_CONSUMER_ACQUIRED_STATE);
}
private boolean acquire(final EntryState state)
@@ -198,7 +202,7 @@ public abstract class QueueEntryImpl imp
return acquired;
}
- public boolean acquire(Subscription sub)
+ public boolean acquire(QueueConsumer<?,E,Q,L> sub)
{
final boolean acquired = acquire(sub.getOwningState());
if(acquired)
@@ -208,17 +212,17 @@ public abstract class QueueEntryImpl imp
return acquired;
}
- public boolean acquiredBySubscription()
+ public boolean acquiredByConsumer()
{
- return (_state instanceof SubscriptionAcquiredState);
+ return (_state instanceof ConsumerAcquiredState);
}
- public boolean isAcquiredBy(Subscription subscription)
+ public boolean isAcquiredBy(QueueConsumer consumer)
{
EntryState state = _state;
- return state instanceof SubscriptionAcquiredState
- && ((SubscriptionAcquiredState)state).getSubscription() == subscription;
+ return state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer;
}
public void release()
@@ -228,19 +232,14 @@ public abstract class QueueEntryImpl imp
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- if(state instanceof SubscriptionAcquiredState)
+ if(state instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount(this);
- Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
- if (subscription != null)
- {
- subscription.releaseQueueEntry(this);
- }
+ getQueue().decrementUnackedMsgCount((E) this);
}
if(!getQueue().isDeleted())
{
- getQueue().requeue(this);
+ getQueue().requeue((E)this);
if(_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
@@ -265,12 +264,12 @@ public abstract class QueueEntryImpl imp
return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
}
- public Subscription getDeliveredSubscription()
+ public QueueConsumer getDeliveredConsumer()
{
EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
+ if (state instanceof ConsumerAcquiredState)
{
- return ((SubscriptionAcquiredState) state).getSubscription();
+ return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
}
else
{
@@ -280,16 +279,16 @@ public abstract class QueueEntryImpl imp
public void reject()
{
- Subscription subscription = getDeliveredSubscription();
+ QueueConsumer consumer = getDeliveredConsumer();
- if (subscription != null)
+ if (consumer != null)
{
if (_rejectedBy == null)
{
_rejectedBy = new HashSet<Long>();
}
- _rejectedBy.add(subscription.getSubscriptionID());
+ _rejectedBy.add(consumer.getId());
}
else
{
@@ -297,12 +296,12 @@ public abstract class QueueEntryImpl imp
}
}
- public boolean isRejectedBy(long subscriptionId)
+ public boolean isRejectedBy(QueueConsumer consumer)
{
- if (_rejectedBy != null) // We have subscriptions that rejected this message
+ if (_rejectedBy != null) // We have consumers that rejected this message
{
- return _rejectedBy.contains(subscriptionId);
+ return _rejectedBy.contains(consumer.getId());
}
else // This message hasn't been rejected yet.
{
@@ -316,15 +315,12 @@ public abstract class QueueEntryImpl imp
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- Subscription s = null;
- if (state instanceof SubscriptionAcquiredState)
+ if (state instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount(this);
- s = ((SubscriptionAcquiredState) state).getSubscription();
- s.onDequeue(this);
+ getQueue().decrementUnackedMsgCount((E) this);
}
- getQueue().dequeue(this,s);
+ getQueue().dequeue((E)this);
if(_stateChangeListeners != null)
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
@@ -336,9 +332,9 @@ public abstract class QueueEntryImpl imp
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener l : _stateChangeListeners)
+ for(StateChangeListener<? super E, State> l : _stateChangeListeners)
{
- l.stateChanged(this, oldState, newState);
+ l.stateChanged((E)this, oldState, newState);
}
}
@@ -348,7 +344,7 @@ public abstract class QueueEntryImpl imp
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.entryDeleted(this);
+ _queueEntryList.entryDeleted((E)this);
onDelete();
_message.release();
@@ -367,44 +363,49 @@ public abstract class QueueEntryImpl imp
dispose();
}
- public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
+ public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
boolean autocommit = txn == null;
+ int enqueues;
+
+ if(autocommit)
+ {
+ txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
+ }
+
if (alternateExchange != null)
{
- if(autocommit)
+ enqueues = alternateExchange.send(getMessage(),
+ getInstanceProperties(),
+ txn,
+ action);
+ }
+ else
+ {
+ enqueues = 0;
+ }
+
+ txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
+ {
+ public void postCommit()
{
- txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
+ delete();
}
- int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action);
-
- txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
+ public void onRollback()
{
- public void postCommit()
- {
- delete();
- }
-
- public void onRollback()
- {
-
- }
- });
- if(autocommit)
- {
- txn.commit();
}
- return enqueues;
+ });
- }
- else
+ if(autocommit)
{
- return 0;
+ txn.commit();
}
+
+ return enqueues;
}
public boolean isQueueDeleted()
@@ -412,21 +413,21 @@ public abstract class QueueEntryImpl imp
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener listener)
+ public void addStateChangeListener(StateChangeListener<? super E,State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super E, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener listener)
+ public boolean removeStateChangeListener(StateChangeListener<? super E, State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
@@ -436,9 +437,9 @@ public abstract class QueueEntryImpl imp
}
- public int compareTo(final QueueEntry o)
+ public int compareTo(final E o)
{
- QueueEntryImpl other = (QueueEntryImpl)o;
+ E other = o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
@@ -446,7 +447,7 @@ public abstract class QueueEntryImpl imp
{
}
- public QueueEntryList getQueueEntryList()
+ public L getQueueEntryList()
{
return _queueEntryList;
}
@@ -461,6 +462,12 @@ public abstract class QueueEntryImpl imp
return _deliveryCount;
}
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return getQueue().getMaximumDeliveryCount();
+ }
+
public void incrementDeliveryCount()
{
_deliveryCountUpdater.incrementAndGet(this);
@@ -485,6 +492,23 @@ public abstract class QueueEntryImpl imp
'}';
}
+ @Override
+ public boolean resend() throws AMQException
+ {
+ QueueConsumer<?,E,Q,L> sub = getDeliveredConsumer();
+ if(sub != null)
+ {
+ return sub.resend((E)this);
+ }
+ return false;
+ }
+
+ @Override
+ public TransactionLogResource getOwningResource()
+ {
+ return getQueue();
+ }
+
private static class EntryInstanceProperties implements InstanceProperties
{
private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java Wed Feb 12 13:27:51 2014
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryIterator<QE extends QueueEntry>
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryIterator<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
{
boolean atTail();
- QE getNode();
+ E getNode();
boolean advance();
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Wed Feb 12 13:27:51 2014
@@ -20,21 +20,23 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.ServerMessage;
-public interface QueueEntryList<Q extends QueueEntry>
+public interface QueueEntryList<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
{
- AMQQueue getQueue();
+ Q getQueue();
- Q add(ServerMessage message);
+ E add(ServerMessage message);
- Q next(Q node);
+ E next(E node);
- QueueEntryIterator<Q> iterator();
+ QueueEntryIterator<E,Q,L,C> iterator();
- Q getHead();
+ E getHead();
- void entryDeleted(Q queueEntry);
+ void entryDeleted(E queueEntry);
int getPriorities();
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java Wed Feb 12 13:27:51 2014
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-interface QueueEntryListFactory
+interface QueueEntryListFactory<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- public QueueEntryList createQueueEntryList(AMQQueue queue);
+ public L createQueueEntryList(Q queue);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java Wed Feb 12 13:27:51 2014
@@ -16,7 +16,9 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryVisitor
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryVisitor<E extends QueueEntry>
{
- boolean visit(QueueEntry entry);
+ boolean visit(E entry);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Wed Feb 12 13:27:51 2014
@@ -32,8 +32,8 @@ import org.apache.qpid.transport.Transpo
/**
* QueueRunners are Runnables used to process a queue when requiring
- * asynchronous message delivery to subscriptions, which is necessary
- * when straight-through delivery of a message to a subscription isn't
+ * asynchronous message delivery to consumers, which is necessary
+ * when straight-through delivery of a message to a consumer isn't
* possible during the enqueue operation.
*/
public class QueueRunner implements Runnable
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org