You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/27 14:43:12 UTC
svn commit: r660490 - in /incubator/qpid/branches/broker-queue-refactor/java:
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main...
Author: rgodfrey
Date: Tue May 27 05:43:04 2008
New Revision: 660490
URL: http://svn.apache.org/viewvc?rev=660490&view=rev
Log:
Refactoring updates (job queue changes, enqueue collections..)
Added:
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
Removed:
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue May 27 05:43:04 2008
@@ -50,6 +50,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -248,8 +249,10 @@
_logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
}
boolean routed = false;
+ Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
for (Registration e : _bindings)
{
+
if (e.binding.matches(headers))
{
if (_logger.isDebugEnabled())
@@ -257,10 +260,12 @@
_logger.debug("Exchange " + getName() + ": delivering message with headers " +
headers + " to " + e.queue.getName());
}
- payload.enqueue(e.queue);
+ queues.add(e.queue);
+
routed = true;
}
}
+ payload.enqueue(queues);
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Tue May 27 05:43:04 2008
@@ -262,8 +262,24 @@
_filteredQueues.put(queue,newFilters);
}
- public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues)
+ public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
{
+ if(queues == null)
+ {
+ if(_filteredQueues.isEmpty())
+ {
+ return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
+ }
+ else
+ {
+ queues = new HashSet<AMQQueue>();
+ }
+ }
+ else if(!(queues instanceof Set))
+ {
+ queues = new HashSet<AMQQueue>(queues);
+ }
+
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
@@ -621,11 +637,11 @@
}
else
{
- Set<AMQQueue> queues = new HashSet<AMQQueue>();
+ Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
- ((TopicExchangeResult)result).processMessage(message, queues);
+ queues = ((TopicExchangeResult)result).processMessage(message, queues);
}
return queues;
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue May 27 05:43:04 2008
@@ -444,7 +444,7 @@
public boolean channelAwaitingClosure(int channelId)
{
- return _closingChannelsList.contains(channelId);
+ return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue May 27 05:43:04 2008
@@ -136,6 +136,11 @@
}
}
+ public void clearStoreContext()
+ {
+ _storeContext = new StoreContext();
+ }
+
public StoreContext getStoreContext()
{
return _storeContext;
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue May 27 05:43:04 2008
@@ -58,15 +58,13 @@
void unregisterSubscription(final Subscription subscription) throws AMQException;
+
int getConsumerCount();
int getActiveConsumerCount();
boolean isUnused();
-
-
-
boolean isEmpty();
int getMessageCount();
@@ -80,10 +78,27 @@
long getOldestMessageArrivalTime();
-
boolean isDeleted();
+ int delete() throws AMQException;
+
+
+ QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+
+ void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
+
+ void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
+
+
+
+ boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+
+
+
+ void addQueueDeleteTask(final Task task);
+
+
List<QueueEntry> getMessagesOnTheQueue();
List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
@@ -91,7 +106,6 @@
QueueEntry getMessageOnTheQueue(long messageId);
-
void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
StoreContext storeContext);
@@ -99,9 +113,7 @@
void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext);
- void quiesce();
- void start();
long getMaximumMessageSize();
@@ -132,27 +144,14 @@
- int delete() throws AMQException;
-
-
- QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
-
- void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
-
- void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
-
- void deliverAsync();
-
- void addQueueDeleteTask(final Task task);
-
- boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
void removeExpiredIfNoSubscribers() throws AMQException;
Set<NotificationCheck> getNotificationChecks();
void flushSubscription(final Subscription sub) throws AMQException;
+ void deliverAsync(final Subscription sub);
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Tue May 27 05:43:04 2008
@@ -32,10 +32,8 @@
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
import org.apache.log4j.Logger;
-import java.util.ArrayList;
import java.util.Collection;
public class IncomingMessage implements Filterable<RuntimeException>
@@ -198,19 +196,53 @@
}
else
{
+ int offset;
+ final int queueCount = destinationQueues.size();
+ if(queueCount == 1)
+ {
+ offset = 0;
+ }
+ else
+ {
+ offset = ((int)(message.getMessageId().longValue())) % queueCount;
+ if(offset < 0)
+ {
+ offset = -offset;
+ }
+ }
+
+ int i = 0;
for (AMQQueue q : destinationQueues)
{
- // Increment the references to this message for each queue delivery.
- message.incrementReference();
- // normal deliver so add this message at the end.
- _txnContext.deliver(q, message);
+ if(++i > offset)
+ {
+ // Increment the references to this message for each queue delivery.
+ message.incrementReference();
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(q, message);
+ }
}
+ i = 0;
+ if(offset != 0)
+ {
+ for (AMQQueue q : destinationQueues)
+ {
+ if(i++ < offset)
+ {
+ // Increment the references to this message for each queue delivery.
+ message.incrementReference();
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(q, message);
+ }
+ }
+ }
+
}
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
-
+ message.clearStoreContext();
return message;
}
finally
@@ -257,16 +289,6 @@
return _messagePublishInfo.isImmediate();
}
-
- public void enqueue(final AMQQueue q) throws AMQException
- {
- if(_destinationQueues == null)
- {
- _destinationQueues = new ArrayList<AMQQueue>();
- }
- _destinationQueues.add(q);
- }
-
public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue May 27 05:43:04 2008
@@ -42,7 +42,7 @@
private final SimpleQueueEntryList _queueEntryList;
- private final AMQMessage _message;
+ private AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
@@ -376,7 +376,7 @@
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.advanceHead();
return true;
}
else
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue May 27 05:43:04 2008
@@ -11,6 +11,8 @@
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.configuration.Configured;
import org.apache.log4j.Logger;
@@ -21,8 +23,6 @@
import java.util.EnumSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -84,7 +84,7 @@
protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
- private boolean _exclusiveSubscriber;
+ private volatile Subscription _exclusiveSubscriber;
private final QueueEntryList _entries;
@@ -116,9 +116,15 @@
@Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
public long _minimumAlertRepeatGap;
+
+
+ private static final int MAX_ASYNC_DELIVERIES = 10;
+
+
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -155,7 +161,9 @@
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _asyncDelivery = AsyncDeliveryConfig.getAsyncDeliveryExecutor();
+ _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+
+ AsyncDeliveryConfig.getAsyncDeliveryExecutor();
try
{
@@ -235,11 +243,11 @@
// ------ Manage Subscriptions
- public void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
+ public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
{
- if(_exclusiveSubscriber)
+ if(isExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
}
@@ -249,7 +257,7 @@
throw new ExistingSubscriptionPreventsExclusive();
}
- _exclusiveSubscriber = exclusive;
+ setExclusiveSubscriber(subscription);
_activeSubscriberCount.incrementAndGet();
subscription.setStateListener(this);
@@ -274,7 +282,7 @@
}
- public void unregisterSubscription(final Subscription subscription) throws AMQException
+ public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
{
if(subscription == null)
{
@@ -289,9 +297,16 @@
{
subscription.close();
// No longer can the queue have an exclusive consumer
- _exclusiveSubscriber = false;
+ setExclusiveSubscriber(null);
+ QueueEntry lastSeen;
+
+ while((lastSeen = subscription.getLastSeenEntry()) != null)
+ {
+ subscription.setLastSeenEntry(lastSeen, null);
+ }
+
@@ -329,83 +344,84 @@
_totalMessagesReceived.incrementAndGet();
- QueueEntry entry = _entries.add(message);
+ QueueEntry entry;
+ Subscription exclusiveSub = _exclusiveSubscriber;
+ if(exclusiveSub != null)
+ {
+ exclusiveSub.getSendLock();
- /*
+ try
+ {
+ entry = _entries.add(message);
+ deliverToSubscription(exclusiveSub, entry);
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
- */
- SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
- SubscriptionList.SubscriptionNode nextNode = node.getNext();
- if(nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
- while(nextNode != null)
- {
- if(_lastSubscriptionNode.compareAndSet(node, nextNode))
- {
- break;
- }
- else
- {
- node = _lastSubscriptionNode.get();
- nextNode = node.getNext();
- if(nextNode == null)
+ // where there is more than one producer there's a reasonable chance that even though there is
+ // no "queueing" we do not deliver because we get an interleving of _entries.add and
+ // deliverToSubscription between threads. Therefore have one more try.
+ if(!(entry.isAcquired() || entry.isDeleted()))
{
- nextNode = _subscriptionList.getHead().getNext();
+ deliverToSubscription(exclusiveSub, entry);
}
}
+ finally
+ {
+ exclusiveSub.releaseSendLock();
+ }
}
+ else
+ {
+ entry = _entries.add(message);
+ /*
+ iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
-
- while(!entry.isAcquired() && loops != 0)
- {
+ */
+ SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+ SubscriptionList.SubscriptionNode nextNode = node.getNext();
if(nextNode == null)
{
- loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _subscriptionList.getHead().getNext();
}
- else
+ while(nextNode != null)
{
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- synchronized(sub.getSendLock())
+ if(_lastSubscriptionNode.compareAndSet(node, nextNode))
{
- if(subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended()
- && sub.isActive())
+ break;
+ }
+ else
+ {
+ node = _lastSubscriptionNode.get();
+ nextNode = node.getNext();
+ if(nextNode == null)
{
- if( !sub.wouldSuspend(entry))
- {
- if(!sub.isBrowser() && !entry.acquire(sub))
- {
- sub.restoreCredit(entry);
- }
- else
- {
- QueueEntry queueEntryNode = sub.getLastSeenEntry();
- if(_entries.next(queueEntryNode) == entry)
- {
- sub.setLastSeenEntry(queueEntryNode,entry);
- }
-
- deliverMessage(sub, entry);
-
- }
- }
+ nextNode = _subscriptionList.getHead().getNext();
}
}
}
- nextNode = nextNode.getNext();
- }
+ // always do one extra loop after we believe we've finished
+ // this catches the case where we *just* miss an update
+ int loops = 2;
+
+ while(!entry.isAcquired() && loops != 0)
+ {
+ if(nextNode == null)
+ {
+ loops--;
+ nextNode = _subscriptionList.getHead();
+ }
+ else
+ {
+ // if subscription at end, and active, offer
+ Subscription sub = nextNode.getSubscription();
+ deliverToSubscription(sub, entry);
+ }
+ nextNode = nextNode.getNext();
+
+ }
+ }
if(entry.immediateAndNotDelivered())
@@ -413,7 +429,7 @@
dequeue(storeContext, entry);
entry.dispose(storeContext);
}
- else if(!entry.isAcquired())
+ else if(!(entry.isAcquired() || entry.isDeleted()))
{
checkSubscriptionsNotAheadOfDelivery(entry);
@@ -435,6 +451,42 @@
}
+ private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
+ throws AMQException
+ {
+ sub.getSendLock();
+ try
+ {
+ if(subscriptionReadyAndHasInterest(sub, entry)
+ && !sub.isSuspended()
+ && sub.isActive())
+ {
+ if( !sub.wouldSuspend(entry))
+ {
+ if(!sub.isBrowser() && !entry.acquire(sub))
+ {
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ QueueEntry queueEntryNode = sub.getLastSeenEntry();
+ if(_entries.next(queueEntryNode) == entry)
+ {
+ sub.setLastSeenEntry(queueEntryNode,entry);
+ }
+
+ deliverMessage(sub, entry);
+
+ }
+ }
+ }
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+ }
+
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
@@ -588,7 +640,8 @@
/* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
entry to resend and move back the subscription pointer. */
- synchronized(subscription.getSendLock())
+ subscription.getSendLock();
+ try
{
if(!subscription.isClosed())
{
@@ -600,6 +653,10 @@
return false;
}
}
+ finally
+ {
+ subscription.releaseSendLock();
+ }
}
@@ -703,7 +760,7 @@
_activeSubscriberCount.incrementAndGet();
}
- deliverAsync();
+ deliverAsync(sub);
}
}
@@ -722,6 +779,16 @@
return _atomicQueueSize;
}
+ private boolean isExclusiveSubscriber()
+ {
+ return _exclusiveSubscriber != null;
+ }
+
+ private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
+ {
+ _exclusiveSubscriber = exclusiveSubscriber;
+ }
+
public static interface QueueEntryFilter
{
public boolean accept(QueueEntry entry);
@@ -999,22 +1066,6 @@
}
- public void quiesce()
- {
- _quiesced.set(true);
- }
-
- public void start()
- {
- if(_quiesced.compareAndSet(true,false))
- {
- deliverAsync();
- }
- }
-
-
-
-
// ------ Management functions
@@ -1088,6 +1139,7 @@
}
_deleteTaskList.clear();
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
}
return getMessageCount();
@@ -1098,13 +1150,20 @@
{
_stateChangeCount.incrementAndGet();
- if(_asynchronousRunner.get() == null)
- {
- _asyncDelivery.execute(new Runner());
+ Runner runner = new Runner();
+
+ if(_asynchronousRunner.compareAndSet(null,runner))
+ {
+ _asyncDelivery.execute(runner);
}
}
- private class Runner implements Runnable
+ public void deliverAsync(Subscription sub)
+ {
+ _asyncDelivery.execute(new SubFlushRunner(sub));
+ }
+
+ private class Runner implements ReadWriteRunnable
{
public void run()
{
@@ -1118,21 +1177,77 @@
}
}
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+ }
+
+
+ private class SubFlushRunner implements ReadWriteRunnable
+ {
+ private final Subscription _sub;
+
+
+ public SubFlushRunner(Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public void run()
+ {
+ boolean complete = false;
+ try
+ {
+ complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES);
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ if(!complete && !_sub.isSuspended())
+ {
+ _asyncDelivery.execute(this);
+ }
+
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
}
public void flushSubscription(Subscription sub) throws AMQException
{
+ flushSubscription(sub, Long.MAX_VALUE);
+ }
+
+ public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException
+ {
boolean atTail = false;
- while(sub.isActive() && !atTail)
+
+ while(!sub.isSuspended() && !atTail && deliveries != 0)
{
- synchronized(sub.getSendLock())
+ sub.getSendLock();
+ try
{
if(sub.isActive())
{
-
QueueEntry node = moveSubscriptionToNextNode(sub);
-
if(!(node.isAcquired() || node.isDeleted()))
{
if(!sub.isSuspended())
@@ -1148,6 +1263,7 @@
}
else
{
+ deliveries--;
deliverMessage(sub, node);
if(sub.isBrowser())
@@ -1159,8 +1275,6 @@
sub.setLastSeenEntry(node, newNode);
node = sub.getLastSeenEntry();
}
-
-
}
}
@@ -1180,13 +1294,36 @@
}
}
}
+
}
atTail = (_entries.next(node) == null);
}
}
+ finally
+ {
+ sub.releaseSendLock();
+ }
}
+
+ if(!isExclusiveSubscriber())
+ {
+ advanceAllSubscriptions();
+ }
+
+ return atTail;
+ }
+
+ protected void advanceAllSubscriptions() throws AMQException
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ while(subscriberIter.advance())
+ {
+ SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
+ Subscription sub = subNode.getSubscription();
+ moveSubscriptionToNextNode(sub);
+ }
}
private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
@@ -1227,8 +1364,11 @@
boolean deliveryIncomplete = true;
int extraLoops = 1;
+ int deliveries = MAX_ASYNC_DELIVERIES;
+
+ _asynchronousRunner.compareAndSet(runner,null);
- while(((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
+ while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
{
// we want to have one extra loop after the every subscription has reached the point where it cannot move
// further, just in case the advance of one subscription in the last loop allows a different subscription to
@@ -1251,17 +1391,21 @@
Subscription sub = subscriptionIter.getNode().getSubscription();
if(sub != null)
{
- synchronized(sub.getSendLock())
+ sub.getSendLock();
+ try
{
+ QueueEntry node = moveSubscriptionToNextNode(sub);
+
if(sub.isActive())
{
boolean advanced = false;
+ boolean subActive = false;
- QueueEntry node = moveSubscriptionToNextNode(sub);
if(!(node.isAcquired() || node.isDeleted()))
{
if(!sub.isSuspended())
{
+ subActive = true;
if(sub.hasInterest(node))
{
if(!sub.wouldSuspend(node))
@@ -1274,6 +1418,7 @@
else
{
deliverMessage(sub, node);
+ deliveries--;
if(sub.isBrowser())
{
@@ -1309,7 +1454,7 @@
}
final boolean atTail = (_entries.next(node) == null);
- done = done && atTail;
+ done = done && (!subActive || atTail);
if(atTail && !advanced && sub.isAutoClose())
{
@@ -1322,6 +1467,10 @@
}
}
+ finally
+ {
+ sub.releaseSendLock();
+ }
}
if(done)
{
@@ -1346,6 +1495,10 @@
}
+ if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner))
+ {
+ _asyncDelivery.execute(runner);
+ }
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Tue May 27 05:43:04 2008
@@ -28,7 +28,6 @@
public interface Subscription
{
- boolean isActive();
public static enum State
@@ -75,7 +74,7 @@
boolean wouldSuspend(QueueEntry msg);
Object getSendLock();
-
+ void releaseSendLock();
void resend(final QueueEntry entry) throws AMQException;
@@ -87,4 +86,9 @@
boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
+
+ boolean isActive();
+
+
+
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Tue May 27 05:43:04 2008
@@ -22,6 +22,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -62,6 +65,9 @@
private final RecordDeliveryMethod _recordMethod;
private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final Lock _stateChangeLock;
+ private final Lock _stateChangeExclusiveLock;
+
static final class BrowserSubscription extends SubscriptionImpl
@@ -254,7 +260,8 @@
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
private AMQQueue _queue;
- private final AtomicBoolean _sendLock = new AtomicBoolean(false);
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
@@ -280,7 +287,9 @@
_deliveryMethod = deliveryMethod;
_recordMethod = recordMethod;
-
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ _stateChangeLock = readWriteLock.readLock();
+ _stateChangeExclusiveLock = readWriteLock.writeLock();
if (arguments != null)
{
@@ -334,7 +343,7 @@
public boolean isSuspended()
{
- return !isActive() || _channel.isSuspended() || _sendLock.get();
+ return !isActive() || _channel.isSuspended() || _deleted.get();
}
/**
@@ -344,7 +353,7 @@
*/
public void queueDeleted(AMQQueue queue)
{
- _sendLock.set(true);
+ _deleted.set(true);
// _channel.queueDeleted(queue);
}
@@ -435,7 +444,9 @@
{
boolean closed = false;
State state = getState();
- synchronized (_sendLock)
+
+ _stateChangeExclusiveLock.lock();
+ try
{
while(!closed && state != State.CLOSED)
{
@@ -451,6 +462,11 @@
}
_creditManager.removeListener(this);
}
+ finally
+ {
+ _stateChangeExclusiveLock.unlock();
+ }
+
if (closed)
{
@@ -481,7 +497,13 @@
public Object getSendLock()
{
- return _sendLock;
+ _stateChangeLock.lock();
+ return _deleted;
+ }
+
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
}
public void resend(final QueueEntry entry) throws AMQException
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue May 27 05:43:04 2008
@@ -44,6 +44,7 @@
import javax.management.Notification;
import java.util.LinkedList;
+import java.util.Collections;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -303,7 +304,7 @@
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false, size);
- messages[i].enqueue(_queue);
+ messages[i].enqueue(Collections.singleton(_queue));
messages[i].routingComplete(_messageStore, new MessageHandleFactory());
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue May 27 05:43:04 2008
@@ -48,6 +48,7 @@
import javax.management.JMException;
import java.util.LinkedList;
+import java.util.Collections;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -216,7 +217,7 @@
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
- msg.enqueue(_queue);
+ msg.enqueue(Collections.singleton(_queue));
msg.routingComplete(_messageStore, new MessageHandleFactory());
msg.addContentBodyFrame(new ContentChunk()
@@ -318,7 +319,7 @@
for (int i = 0; i < messageCount; i++)
{
IncomingMessage currentMessage = message(false, persistent);
- currentMessage.enqueue(_queue);
+ currentMessage.enqueue(Collections.singleton(_queue));
// route header
currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Tue May 27 05:43:04 2008
@@ -464,15 +464,49 @@
return false;
}
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ final int hashCode = _hashCode;
+
+ final int otherHashCode = otherString._hashCode;
+
+ if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode))
{
return false;
}
+ final int length = _length;
+
+ if(length != otherString._length)
+ {
+ return false;
+ }
- return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
- || Arrays.equals(getBytes(),otherString.getBytes());
+ final byte[] data = _data;
+
+ final byte[] otherData = otherString._data;
+
+ final int offset = _offset;
+
+ final int otherOffset = otherString._offset;
+
+ if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length)
+ {
+ return Arrays.equals(data, otherData);
+ }
+ else
+ {
+ int thisIdx = offset;
+ int otherIdx = otherOffset;
+ for(int i = length; i-- != 0; )
+ {
+ if(!(data[thisIdx++] == otherData[otherIdx++]))
+ {
+ return false;
+ }
+ }
+ }
+
+ return true;
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java Tue May 27 05:43:04 2008
@@ -41,16 +41,16 @@
private final ReentrantLock _putLock = new ReentrantLock();
- private final ConcurrentLinkedQueue<Job> _readJobQueue = new ConcurrentLinkedQueue<Job>();
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
- private final ConcurrentLinkedQueue<Job> _writeJobQueue = new ConcurrentLinkedQueue<Job>();
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
private class ReadWriteJobIterator implements Iterator<Runnable>
{
private boolean _onReads;
- private Iterator<Job> _iter = _writeJobQueue.iterator();
+ private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
public boolean hasNext()
{
@@ -112,12 +112,12 @@
public boolean offer(final Runnable runnable)
{
- final Job job = (Job) runnable;
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
final ReentrantLock putLock = _putLock;
putLock.lock();
try
{
- if(job.isReadJob())
+ if(job.isRead())
{
_readJobQueue.offer(job);
}
@@ -147,13 +147,13 @@
public void put(final Runnable runnable) throws InterruptedException
{
- final Job job = (Job) runnable;
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
final ReentrantLock putLock = _putLock;
putLock.lock();
try
{
- if(job.isReadJob())
+ if(job.isRead())
{
_readJobQueue.offer(job);
}
@@ -185,13 +185,13 @@
public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
{
- final Job job = (Job) runnable;
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
final ReentrantLock putLock = _putLock;
putLock.lock();
try
{
- if(job.isReadJob())
+ if(job.isRead())
{
_readJobQueue.offer(job);
}
@@ -240,7 +240,7 @@
throw ie;
}
- Job job = _writeJobQueue.poll();
+ ReadWriteRunnable job = _writeJobQueue.poll();
if(job == null)
{
job = _readJobQueue.poll();
@@ -266,7 +266,7 @@
final AtomicInteger count = _count;
long nanos = unit.toNanos(timeout);
takeLock.lockInterruptibly();
- Job job = null;
+ ReadWriteRunnable job = null;
try
{
@@ -322,7 +322,7 @@
_takeLock.lock();
try
{
- Job job;
+ ReadWriteRunnable job;
while((job = _writeJobQueue.peek())!= null)
{
c.add(job);
@@ -356,7 +356,7 @@
_takeLock.lock();
try
{
- Job job;
+ ReadWriteRunnable job;
while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
{
c.add(job);
@@ -391,7 +391,7 @@
{
if(_count.get() > 0)
{
- Job job = _writeJobQueue.poll();
+ ReadWriteRunnable job = _writeJobQueue.poll();
if(job == null)
{
job = _readJobQueue.poll();
@@ -417,7 +417,7 @@
takeLock.lock();
try
{
- Job job = _writeJobQueue.peek();
+ ReadWriteRunnable job = _writeJobQueue.peek();
if(job == null)
{
job = _readJobQueue.peek();
Added: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java?rev=660490&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java Tue May 27 05:43:04 2008
@@ -0,0 +1,27 @@
+package org.apache.qpid.pool;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface ReadWriteRunnable extends Runnable
+{
+ boolean isRead();
+ boolean isWrite();
+}
Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Tue May 27 05:43:04 2008
@@ -110,7 +110,7 @@
*
* @return An executor service.
*/
- ExecutorService acquireExecutorService()
+ public ExecutorService acquireExecutorService()
{
synchronized (_lock)
{
@@ -140,7 +140,7 @@
* Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
* to zero, the executor service is shut down.
*/
- void releaseExecutorService()
+ public void releaseExecutorService()
{
synchronized (_lock)
{
Modified: incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Tue May 27 05:43:04 2008
@@ -22,6 +22,8 @@
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
import javax.management.JMException;
import javax.management.openmbean.OpenDataException;
@@ -201,8 +203,10 @@
headers.put(key, value);
((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
-
- payload.enqueue(q);
+
+ Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(q);
+ payload.enqueue(queues);
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue May 27 05:43:04 2008
@@ -42,6 +42,7 @@
import java.util.LinkedList;
import java.util.Set;
+import java.util.Collections;
/**
* Tests that acknowledgements are handled correctly.
@@ -145,7 +146,7 @@
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
- msg.enqueue(_queue);
+ msg.enqueue(Collections.singleton(_queue));
msg.routingComplete(_messageStore, factory);
if(msg.allContentReceived())
{
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Tue May 27 05:43:04 2008
@@ -91,6 +91,11 @@
return new Object();
}
+ public void releaseSendLock()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void resend(final QueueEntry entry)
{
//To change body of implemented methods use File | Settings | File Templates.