You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/27 14:43:12 UTC

svn commit: r660490 - in /incubator/qpid/branches/broker-queue-refactor/java: broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main...

Author: rgodfrey
Date: Tue May 27 05:43:04 2008
New Revision: 660490

URL: http://svn.apache.org/viewvc?rev=660490&view=rev
Log:
Refactoring updates (job queue changes, enqueue collections..)

Added:
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
Removed:
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java
Modified:
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
    incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue May 27 05:43:04 2008
@@ -50,6 +50,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Collection;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -248,8 +249,10 @@
             _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
         }
         boolean routed = false;
+        Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
         for (Registration e : _bindings)
         {
+
             if (e.binding.matches(headers))
             {
                 if (_logger.isDebugEnabled())
@@ -257,10 +260,12 @@
                     _logger.debug("Exchange " + getName() + ": delivering message with headers " +
                                   headers + " to " + e.queue.getName());
                 }
-                payload.enqueue(e.queue);
+                queues.add(e.queue);
+
                 routed = true;
             }
         }
+        payload.enqueue(queues);
     }
 
     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Tue May 27 05:43:04 2008
@@ -262,8 +262,24 @@
             _filteredQueues.put(queue,newFilters);
         }
 
-        public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues)
+        public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
         {
+            if(queues == null)
+            {
+                if(_filteredQueues.isEmpty())
+                {
+                    return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
+                }
+                else
+                {
+                    queues = new HashSet<AMQQueue>();
+                }
+            }
+            else if(!(queues instanceof Set))
+            {
+                queues = new HashSet<AMQQueue>(queues);
+            }
+
             queues.addAll(_unfilteredQueues.keySet());
             if(!_filteredQueues.isEmpty())
             {
@@ -621,11 +637,11 @@
         }
         else
         {
-            Set<AMQQueue> queues = new HashSet<AMQQueue>();
+            Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
             for(TopicMatcherResult result : results)
             {
 
-                ((TopicExchangeResult)result).processMessage(message, queues);
+                queues = ((TopicExchangeResult)result).processMessage(message, queues);
             }
             return queues;
         }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue May 27 05:43:04 2008
@@ -444,7 +444,7 @@
 
     public boolean channelAwaitingClosure(int channelId)
     {
-        return _closingChannelsList.contains(channelId);
+        return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
     }
 
     public void addChannel(AMQChannel channel) throws AMQException

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue May 27 05:43:04 2008
@@ -136,6 +136,11 @@
         }
     }
 
+    public void clearStoreContext()
+    {
+        _storeContext = new StoreContext();
+    }
+
     public StoreContext getStoreContext()
     {
         return _storeContext;

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue May 27 05:43:04 2008
@@ -58,15 +58,13 @@
 
     void unregisterSubscription(final Subscription subscription) throws AMQException;
 
+
     int getConsumerCount();
 
     int getActiveConsumerCount();
 
     boolean isUnused();
 
-
-
-
     boolean isEmpty();
 
     int getMessageCount();
@@ -80,10 +78,27 @@
 
     long getOldestMessageArrivalTime();
 
-
     boolean isDeleted();
 
 
+    int delete() throws AMQException;
+
+
+    QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+
+    void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
+
+    void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
+
+
+
+    boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+
+    
+
+    void addQueueDeleteTask(final Task task);
+
+
     List<QueueEntry> getMessagesOnTheQueue();
 
     List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
@@ -91,7 +106,6 @@
     QueueEntry getMessageOnTheQueue(long messageId);
 
 
-
     void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
                                                         StoreContext storeContext);
 
@@ -99,9 +113,7 @@
 
     void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext);
 
-    void quiesce();
 
-    void start();
 
     long getMaximumMessageSize();
 
@@ -132,27 +144,14 @@
 
 
 
-    int delete() throws AMQException;
-
-
-    QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
-
-    void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
-
-    void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
-
-    void deliverAsync();
-
-    void addQueueDeleteTask(final Task task);
-
-    boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
     void removeExpiredIfNoSubscribers() throws AMQException;
 
     Set<NotificationCheck> getNotificationChecks();
 
     void flushSubscription(final Subscription sub) throws AMQException;
 
+    void deliverAsync(final Subscription sub);
+
 
     /**
      * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Tue May 27 05:43:04 2008
@@ -32,10 +32,8 @@
 import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
 import org.apache.log4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collection;
 
 public class IncomingMessage implements Filterable<RuntimeException>
@@ -198,19 +196,53 @@
             }
             else
             {
+                int offset;
+                final int queueCount = destinationQueues.size();
+                if(queueCount == 1)
+                {
+                    offset = 0;
+                }
+                else
+                {
+                    offset = ((int)(message.getMessageId().longValue())) % queueCount;
+                    if(offset < 0)
+                    {
+                        offset = -offset;
+                    }
+                }
+
+                int i = 0;
                 for (AMQQueue q : destinationQueues)
                 {
-                    // Increment the references to this message for each queue delivery.
-                    message.incrementReference();
-                    // normal deliver so add this message at the end.
-                    _txnContext.deliver(q, message);
+                    if(++i > offset)
+                    {
+                        // Increment the references to this message for each queue delivery.
+                        message.incrementReference();
+                        // normal deliver so add this message at the end.
+                        _txnContext.deliver(q, message);
+                    }
                 }
+                i = 0;
+                if(offset != 0)
+                {
+                    for (AMQQueue q : destinationQueues)
+                    {
+                        if(i++ < offset)
+                        {
+                            // Increment the references to this message for each queue delivery.
+                            message.incrementReference();
+                            // normal deliver so add this message at the end.
+                            _txnContext.deliver(q, message);
+                        }
+                    }
+                }
+
             }
 
             // we then allow the transactional context to do something with the message content
             // now that it has all been received, before we attempt delivery
             _txnContext.messageFullyReceived(isPersistent());
-
+            message.clearStoreContext();
             return message;
         }
         finally
@@ -257,16 +289,6 @@
         return _messagePublishInfo.isImmediate();
     }
 
-    
-    public void enqueue(final AMQQueue q) throws AMQException
-    {
-        if(_destinationQueues == null)
-        {
-            _destinationQueues = new ArrayList<AMQQueue>();
-        }
-        _destinationQueues.add(q);
-    }
-
     public ContentHeaderBody getContentHeaderBody()
     {
         return _contentHeaderBody;

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue May 27 05:43:04 2008
@@ -42,7 +42,7 @@
 
     private final SimpleQueueEntryList _queueEntryList;
 
-    private final AMQMessage _message;
+    private AMQMessage _message;
 
 
     private Set<Subscription> _rejectedBy = null;
@@ -376,7 +376,7 @@
 
         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
-            _queueEntryList.advanceHead();
+            _queueEntryList.advanceHead();            
             return true;
         }
         else

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue May 27 05:43:04 2008
@@ -11,6 +11,8 @@
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.configuration.Configured;
 import org.apache.log4j.Logger;
 
@@ -21,8 +23,6 @@
 import java.util.EnumSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -84,7 +84,7 @@
     protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
     private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
 
-    private boolean _exclusiveSubscriber;
+    private volatile Subscription _exclusiveSubscriber;
 
 
     private final QueueEntryList _entries;
@@ -116,9 +116,15 @@
     @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
     public long _minimumAlertRepeatGap;
 
+
+
+    private static final int MAX_ASYNC_DELIVERIES = 10;
+
+
     private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
 
 
+
     private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
     private AtomicReference _asynchronousRunner = new AtomicReference(null);
     private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -155,7 +161,9 @@
         _virtualHost = virtualHost;
         _entries = entryListFactory.createQueueEntryList(this);
 
-        _asyncDelivery = AsyncDeliveryConfig.getAsyncDeliveryExecutor();
+        _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+
+        AsyncDeliveryConfig.getAsyncDeliveryExecutor();
 
         try
         {
@@ -235,11 +243,11 @@
 
     // ------ Manage Subscriptions
 
-    public void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
+    public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
     {
 
 
-        if(_exclusiveSubscriber)
+        if(isExclusiveSubscriber())
         {
             throw new ExistingExclusiveSubscription();
         }
@@ -249,7 +257,7 @@
             throw new ExistingSubscriptionPreventsExclusive();
         }
 
-        _exclusiveSubscriber = exclusive;
+        setExclusiveSubscriber(subscription);
 
         _activeSubscriberCount.incrementAndGet();
         subscription.setStateListener(this);
@@ -274,7 +282,7 @@
 
     }
 
-    public void unregisterSubscription(final Subscription subscription) throws AMQException
+    public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
     {
         if(subscription == null)
         {
@@ -289,9 +297,16 @@
         {
             subscription.close();
             // No longer can the queue have an exclusive consumer
-            _exclusiveSubscriber = false;
+            setExclusiveSubscriber(null);
 
 
+            QueueEntry lastSeen;
+
+            while((lastSeen = subscription.getLastSeenEntry()) != null)
+            {
+                subscription.setLastSeenEntry(lastSeen, null);
+            }
+
 
 
 
@@ -329,83 +344,84 @@
         _totalMessagesReceived.incrementAndGet();
 
 
-        QueueEntry entry = _entries.add(message);
+        QueueEntry entry; 
+        Subscription exclusiveSub = _exclusiveSubscriber;
+        if(exclusiveSub != null)
+        {
+            exclusiveSub.getSendLock();
 
-        /*
+            try
+            {
+                entry = _entries.add(message);
+                deliverToSubscription(exclusiveSub, entry);
 
-        iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
 
-         */
-        SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
-        SubscriptionList.SubscriptionNode nextNode = node.getNext();
-        if(nextNode == null)
-        {
-            nextNode = _subscriptionList.getHead().getNext();
-        }
-        while(nextNode != null)
-        {
-            if(_lastSubscriptionNode.compareAndSet(node, nextNode))
-            {
-                break;
-            }
-            else
-            {
-                node = _lastSubscriptionNode.get();
-                nextNode = node.getNext();
-                if(nextNode == null)
+                // where there is more than one producer there's a reasonable chance that even though there is
+                // no "queueing" we do not deliver because we get an interleving of _entries.add and
+                // deliverToSubscription between threads.  Therefore have one more try. 
+                if(!(entry.isAcquired()  || entry.isDeleted()))
                 {
-                    nextNode = _subscriptionList.getHead().getNext();
+                    deliverToSubscription(exclusiveSub, entry);
                 }
             }
+            finally
+            {
+                exclusiveSub.releaseSendLock();
+            }
         }
+        else
+        {
+            entry = _entries.add(message);
+            /*
 
+            iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
 
-        // always do one extra loop after we believe we've finished
-        // this catches the case where we *just* miss an update
-        int loops = 2;
-
-        while(!entry.isAcquired() && loops != 0)
-        {
+             */
+            SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+            SubscriptionList.SubscriptionNode nextNode = node.getNext();
             if(nextNode == null)
             {
-                loops--;
-                nextNode = _subscriptionList.getHead();
+                nextNode = _subscriptionList.getHead().getNext();
             }
-            else
+            while(nextNode != null)
             {
-                // if subscription at end, and active, offer
-                Subscription sub = nextNode.getSubscription();
-                synchronized(sub.getSendLock())
+                if(_lastSubscriptionNode.compareAndSet(node, nextNode))
                 {
-                    if(subscriptionReadyAndHasInterest(sub, entry)
-                       && !sub.isSuspended()
-                       && sub.isActive())
+                    break;
+                }
+                else
+                {
+                    node = _lastSubscriptionNode.get();
+                    nextNode = node.getNext();
+                    if(nextNode == null)
                     {
-                        if( !sub.wouldSuspend(entry))
-                        {
-                            if(!sub.isBrowser() && !entry.acquire(sub))
-                            {
-                                sub.restoreCredit(entry);
-                            }
-                            else
-                            {
-                                QueueEntry queueEntryNode =  sub.getLastSeenEntry();
-                                if(_entries.next(queueEntryNode) == entry)
-                                {
-                                    sub.setLastSeenEntry(queueEntryNode,entry);
-                                }
-
-                                deliverMessage(sub, entry);
-
-                            }
-                        }
+                        nextNode = _subscriptionList.getHead().getNext();
                     }
                 }
             }
-            nextNode = nextNode.getNext();
 
-        }
 
+            // always do one extra loop after we believe we've finished
+            // this catches the case where we *just* miss an update
+            int loops = 2;
+
+            while(!entry.isAcquired() && loops != 0)
+            {
+                if(nextNode == null)
+                {
+                    loops--;
+                    nextNode = _subscriptionList.getHead();
+                }
+                else
+                {
+                    // if subscription at end, and active, offer
+                    Subscription sub = nextNode.getSubscription();
+                    deliverToSubscription(sub, entry);
+                }
+                nextNode = nextNode.getNext();
+
+            }
+        }
 
 
         if(entry.immediateAndNotDelivered())
@@ -413,7 +429,7 @@
             dequeue(storeContext, entry);
             entry.dispose(storeContext);
         }
-        else if(!entry.isAcquired())
+        else if(!(entry.isAcquired()  || entry.isDeleted()))
         {
             checkSubscriptionsNotAheadOfDelivery(entry);
 
@@ -435,6 +451,42 @@
 
     }
 
+    private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
+            throws AMQException
+    {
+        sub.getSendLock();
+        try
+        {
+            if(subscriptionReadyAndHasInterest(sub, entry)
+               && !sub.isSuspended()
+               && sub.isActive())
+            {
+                if( !sub.wouldSuspend(entry))
+                {
+                    if(!sub.isBrowser() && !entry.acquire(sub))
+                    {
+                        sub.restoreCredit(entry);
+                    }
+                    else
+                    {
+                        QueueEntry queueEntryNode =  sub.getLastSeenEntry();
+                        if(_entries.next(queueEntryNode) == entry)
+                        {
+                            sub.setLastSeenEntry(queueEntryNode,entry);
+                        }
+
+                        deliverMessage(sub, entry);
+
+                    }
+                }
+            }
+        }
+        finally
+        {
+            sub.releaseSendLock();
+        }
+    }
+
     protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
     {
         // This method is only required for queues which mess with ordering
@@ -588,7 +640,8 @@
         /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
                   entry to resend and move back the subscription pointer. */
 
-        synchronized(subscription.getSendLock())
+        subscription.getSendLock();
+        try
         {
             if(!subscription.isClosed())
             {
@@ -600,6 +653,10 @@
                 return false;
             }
         }
+        finally
+        {
+            subscription.releaseSendLock();
+        }
     }
 
 
@@ -703,7 +760,7 @@
                 _activeSubscriberCount.incrementAndGet();
 
             }
-            deliverAsync();
+            deliverAsync(sub);
         }
     }
 
@@ -722,6 +779,16 @@
         return _atomicQueueSize;
     }
 
+    private boolean isExclusiveSubscriber()
+    {
+        return _exclusiveSubscriber != null;
+    }
+
+    private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
+    {
+        _exclusiveSubscriber = exclusiveSubscriber;
+    }
+
     public static interface QueueEntryFilter
     {
         public boolean accept(QueueEntry entry);
@@ -999,22 +1066,6 @@
 
     }
 
-    public void quiesce()
-    {
-        _quiesced.set(true);
-    }
-
-    public void start()
-    {
-        if(_quiesced.compareAndSet(true,false))
-        {
-            deliverAsync();
-        }
-    }
-
-
-
-
     // ------ Management functions
 
 
@@ -1088,6 +1139,7 @@
             }
 
             _deleteTaskList.clear();
+            ReferenceCountingExecutorService.getInstance().releaseExecutorService();
         }
         return getMessageCount();
 
@@ -1098,13 +1150,20 @@
     {
         _stateChangeCount.incrementAndGet();
 
-        if(_asynchronousRunner.get() == null)
-        {
-            _asyncDelivery.execute(new Runner());
+        Runner runner = new Runner();
+
+        if(_asynchronousRunner.compareAndSet(null,runner))
+        {            
+            _asyncDelivery.execute(runner);
         }
     }
 
-    private class Runner implements Runnable
+    public void deliverAsync(Subscription sub)
+    {
+        _asyncDelivery.execute(new SubFlushRunner(sub));
+    }
+
+    private class Runner implements ReadWriteRunnable
     {
         public void run()
         {
@@ -1118,21 +1177,77 @@
             }
 
         }
+
+        public boolean isRead()
+        {
+            return false;
+        }
+
+        public boolean isWrite()
+        {
+            return true;
+        }
+    }
+
+
+    private class SubFlushRunner implements ReadWriteRunnable
+    {
+        private final Subscription _sub;
+
+
+        public SubFlushRunner(Subscription sub)
+        {
+            _sub = sub;
+        }
+
+        public void run()
+        {
+            boolean complete = false;
+            try
+            {
+                complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES);
+
+            }
+            catch (AMQException e)
+            {
+                _logger.error(e);
+            }
+            if(!complete && !_sub.isSuspended())
+            {
+                _asyncDelivery.execute(this);
+            }
+
+        }
+
+        public boolean isRead()
+        {
+            return false;
+        }
+
+        public boolean isWrite()
+        {
+            return true;
+        }
     }
 
     public void flushSubscription(Subscription sub) throws AMQException
     {
+        flushSubscription(sub, Long.MAX_VALUE);
+    }
+
+    public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException
+    {
         boolean atTail = false;
-        while(sub.isActive() && !atTail)
+
+        while(!sub.isSuspended() && !atTail && deliveries != 0)
         {
 
-            synchronized(sub.getSendLock())
+            sub.getSendLock();
+            try
             {
                 if(sub.isActive())
                 {
-
                     QueueEntry node = moveSubscriptionToNextNode(sub);
-
                     if(!(node.isAcquired() || node.isDeleted()))
                     {
                         if(!sub.isSuspended())
@@ -1148,6 +1263,7 @@
                                     }
                                     else
                                     {
+                                        deliveries--;
                                         deliverMessage(sub, node);
 
                                         if(sub.isBrowser())
@@ -1159,8 +1275,6 @@
                                                 sub.setLastSeenEntry(node, newNode);
                                                 node = sub.getLastSeenEntry();
                                             }
-
-
                                         }
                                     }
 
@@ -1180,13 +1294,36 @@
                                 }
                             }
                         }
+
                     }
                     atTail = (_entries.next(node) == null);
 
                 }
             }
+            finally
+            {
+                sub.releaseSendLock();
+            }
 
         }
+
+        if(!isExclusiveSubscriber())
+        {
+            advanceAllSubscriptions();
+        }
+
+        return atTail;
+    }
+
+    protected void advanceAllSubscriptions() throws AMQException
+    {
+        SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+        while(subscriberIter.advance())
+        {
+            SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
+            Subscription sub = subNode.getSubscription();
+            moveSubscriptionToNextNode(sub);
+        }
     }
 
     private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
@@ -1227,8 +1364,11 @@
         boolean deliveryIncomplete = true;
 
         int extraLoops = 1;
+        int deliveries = MAX_ASYNC_DELIVERIES;
+
+        _asynchronousRunner.compareAndSet(runner,null);
 
-        while(((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
+        while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
         {
             // we want to have one extra loop after the every subscription has reached the point where it cannot move
             // further, just in case the advance of one subscription in the last loop allows a different subscription to
@@ -1251,17 +1391,21 @@
                 Subscription sub = subscriptionIter.getNode().getSubscription();
                 if(sub != null)
                 {
-                    synchronized(sub.getSendLock())
+                    sub.getSendLock();
+                    try
                     {
+                        QueueEntry node = moveSubscriptionToNextNode(sub);
+
                         if(sub.isActive())
                         {
                             boolean advanced = false;
+                            boolean subActive = false;
 
-                            QueueEntry node = moveSubscriptionToNextNode(sub);
                             if(!(node.isAcquired() || node.isDeleted()))
                             {
                                 if(!sub.isSuspended())
                                 {
+                                    subActive = true;
                                     if(sub.hasInterest(node))
                                     {
                                         if(!sub.wouldSuspend(node))
@@ -1274,6 +1418,7 @@
                                             else
                                             {
                                                 deliverMessage(sub, node);
+                                                deliveries--;
 
                                                 if(sub.isBrowser())
                                                 {
@@ -1309,7 +1454,7 @@
                             }
                             final boolean atTail = (_entries.next(node) == null);
 
-                            done = done && atTail;
+                            done = done && (!subActive || atTail);
 
                             if(atTail && !advanced && sub.isAutoClose())
                             {
@@ -1322,6 +1467,10 @@
 
                         }
                     }
+                    finally
+                    {
+                        sub.releaseSendLock();
+                    }
                 }
                 if(done)
                 {
@@ -1346,6 +1495,10 @@
         }
 
 
+        if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner))
+        {
+            _asyncDelivery.execute(runner);
+        }
     }
 
 

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Tue May 27 05:43:04 2008
@@ -28,7 +28,6 @@
 
 public interface Subscription
 {
-    boolean isActive();
 
 
     public static enum State
@@ -75,7 +74,7 @@
     boolean wouldSuspend(QueueEntry msg);
 
     Object getSendLock();
-
+    void releaseSendLock();
 
     void resend(final QueueEntry entry) throws AMQException;
 
@@ -87,4 +86,9 @@
 
     boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
 
+
+    boolean isActive();
+
+
+
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Tue May 27 05:43:04 2008
@@ -22,6 +22,9 @@
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -62,6 +65,9 @@
     private final RecordDeliveryMethod _recordMethod;
     
     private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+    private final Lock _stateChangeLock;
+    private final Lock _stateChangeExclusiveLock;
+
 
 
     static final class BrowserSubscription extends SubscriptionImpl
@@ -254,7 +260,8 @@
     private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
 
     private AMQQueue _queue;
-    private final AtomicBoolean _sendLock = new AtomicBoolean(false);
+    private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
 
 
     
@@ -280,7 +287,9 @@
         _deliveryMethod = deliveryMethod;
         _recordMethod = recordMethod;
 
-
+        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+        _stateChangeLock = readWriteLock.readLock();
+        _stateChangeExclusiveLock = readWriteLock.writeLock();
 
         if (arguments != null)
         {
@@ -334,7 +343,7 @@
 
     public boolean isSuspended()
     {
-        return !isActive() || _channel.isSuspended() || _sendLock.get();
+        return !isActive() || _channel.isSuspended() || _deleted.get();
     }
 
     /**
@@ -344,7 +353,7 @@
      */
     public void queueDeleted(AMQQueue queue)
     {
-        _sendLock.set(true);
+        _deleted.set(true);
 //        _channel.queueDeleted(queue);
     }
 
@@ -435,7 +444,9 @@
     {
         boolean closed = false;
         State state = getState();
-        synchronized (_sendLock)
+
+        _stateChangeExclusiveLock.lock();
+        try
         {
             while(!closed && state != State.CLOSED)
             {
@@ -451,6 +462,11 @@
             }
             _creditManager.removeListener(this);
         }
+        finally
+        {
+            _stateChangeExclusiveLock.unlock();
+        }
+
 
         if (closed)
         {
@@ -481,7 +497,13 @@
 
     public Object getSendLock()
     {
-        return _sendLock;
+        _stateChangeLock.lock();
+        return _deleted;
+    }
+
+    public void releaseSendLock()
+    {
+        _stateChangeLock.unlock();
     }
 
     public void resend(final QueueEntry entry) throws AMQException

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue May 27 05:43:04 2008
@@ -44,6 +44,7 @@
 
 import javax.management.Notification;
 import java.util.LinkedList;
+import java.util.Collections;
 
 /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
 public class AMQQueueAlertTest extends TestCase
@@ -303,7 +304,7 @@
         for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message(false, size);
-            messages[i].enqueue(_queue);
+            messages[i].enqueue(Collections.singleton(_queue));
             messages[i].routingComplete(_messageStore, new MessageHandleFactory());
 
         }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue May 27 05:43:04 2008
@@ -48,6 +48,7 @@
 
 import javax.management.JMException;
 import java.util.LinkedList;
+import java.util.Collections;
 
 /**
  * Test class to test AMQQueueMBean attribtues and operations
@@ -216,7 +217,7 @@
         long id = msg.getMessageId();
         _queue.clearQueue(_storeContext);
 
-        msg.enqueue(_queue);
+        msg.enqueue(Collections.singleton(_queue));
         msg.routingComplete(_messageStore, new MessageHandleFactory());
 
         msg.addContentBodyFrame(new ContentChunk()
@@ -318,7 +319,7 @@
         for (int i = 0; i < messageCount; i++)
         {
             IncomingMessage currentMessage = message(false, persistent);
-            currentMessage.enqueue(_queue);
+            currentMessage.enqueue(Collections.singleton(_queue));
 
             // route header
             currentMessage.routingComplete(_messageStore, new MessageHandleFactory());

Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Tue May 27 05:43:04 2008
@@ -464,15 +464,49 @@
             return false;
         }
 
-        if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+        final int hashCode = _hashCode;
+
+        final int otherHashCode = otherString._hashCode;
+
+        if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode))
         {
             return false;
         }
 
+        final int length = _length;
+
+        if(length != otherString._length)
+        {
+            return false;
+        }
 
 
-        return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
-                || Arrays.equals(getBytes(),otherString.getBytes());
+        final byte[] data = _data;
+
+        final byte[] otherData = otherString._data;
+
+        final int offset = _offset;
+
+        final int otherOffset = otherString._offset;
+
+        if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length)
+        {
+            return Arrays.equals(data, otherData);
+        }
+        else
+        {
+            int thisIdx = offset;
+            int otherIdx = otherOffset;
+            for(int i = length;  i-- != 0; )
+            {
+                if(!(data[thisIdx++] == otherData[otherIdx++]))
+                {
+                    return false;
+                }
+            }
+        }
+
+        return true;
 
     }
 

Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java Tue May 27 05:43:04 2008
@@ -41,16 +41,16 @@
 
     private final ReentrantLock _putLock = new ReentrantLock();
 
-    private final ConcurrentLinkedQueue<Job> _readJobQueue = new ConcurrentLinkedQueue<Job>();
+    private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
 
-    private final ConcurrentLinkedQueue<Job> _writeJobQueue = new ConcurrentLinkedQueue<Job>();
+    private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
 
 
     private class ReadWriteJobIterator implements Iterator<Runnable>
     {
 
         private boolean _onReads;
-        private Iterator<Job> _iter = _writeJobQueue.iterator();
+        private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
 
         public boolean hasNext()
         {
@@ -112,12 +112,12 @@
 
     public boolean offer(final Runnable runnable)
     {
-        final Job job = (Job) runnable;
+        final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
         final ReentrantLock putLock = _putLock;
         putLock.lock();
         try
         {
-            if(job.isReadJob())
+            if(job.isRead())
             {
                 _readJobQueue.offer(job);
             }
@@ -147,13 +147,13 @@
 
     public void put(final Runnable runnable) throws InterruptedException
     {
-        final Job job = (Job) runnable;
+        final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
         final ReentrantLock putLock = _putLock;
         putLock.lock();
 
         try
         {
-            if(job.isReadJob())
+            if(job.isRead())
             {
                 _readJobQueue.offer(job);
             }
@@ -185,13 +185,13 @@
 
     public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
     {
-        final Job job = (Job) runnable;
+        final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
         final ReentrantLock putLock = _putLock;
         putLock.lock();
 
         try
         {
-            if(job.isReadJob())
+            if(job.isRead())
             {
                 _readJobQueue.offer(job);
             }
@@ -240,7 +240,7 @@
                 throw ie;
             }
 
-            Job job = _writeJobQueue.poll();
+            ReadWriteRunnable job = _writeJobQueue.poll();
             if(job == null)
             {
                 job = _readJobQueue.poll();
@@ -266,7 +266,7 @@
         final AtomicInteger count = _count;
         long nanos = unit.toNanos(timeout);
         takeLock.lockInterruptibly();
-        Job job = null;
+        ReadWriteRunnable job = null;
         try
         {
 
@@ -322,7 +322,7 @@
         _takeLock.lock();
         try
         {
-            Job job;
+            ReadWriteRunnable job;
             while((job = _writeJobQueue.peek())!= null)
             {
                 c.add(job);
@@ -356,7 +356,7 @@
         _takeLock.lock();
         try
         {
-            Job job;
+            ReadWriteRunnable job;
             while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
             {
                 c.add(job);
@@ -391,7 +391,7 @@
         {
             if(_count.get() > 0)
             {
-                Job job = _writeJobQueue.poll();
+                ReadWriteRunnable job = _writeJobQueue.poll();
                 if(job == null)
                 {
                     job = _readJobQueue.poll();
@@ -417,7 +417,7 @@
         takeLock.lock();
         try
         {
-            Job job = _writeJobQueue.peek();
+            ReadWriteRunnable job = _writeJobQueue.peek();
             if(job == null)
             {
                 job = _readJobQueue.peek();

Added: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java?rev=660490&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java Tue May 27 05:43:04 2008
@@ -0,0 +1,27 @@
+package org.apache.qpid.pool;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface ReadWriteRunnable extends Runnable
+{
+    boolean isRead();
+    boolean isWrite();
+}

Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Tue May 27 05:43:04 2008
@@ -110,7 +110,7 @@
      *
      * @return An executor service.
      */
-    ExecutorService acquireExecutorService()
+    public ExecutorService acquireExecutorService()
     {
         synchronized (_lock)
         {
@@ -140,7 +140,7 @@
      * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
      * to zero, the executor service is shut down.
      */
-    void releaseExecutorService()
+    public void releaseExecutorService()
     {
         synchronized (_lock)
         {

Modified: incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Tue May 27 05:43:04 2008
@@ -22,6 +22,8 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import javax.management.JMException;
 import javax.management.openmbean.OpenDataException;
@@ -201,8 +203,10 @@
         headers.put(key, value);
         ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
         AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
-        
-        payload.enqueue(q);
+
+        Collection<AMQQueue> queues =  new ArrayList<AMQQueue>();
+        queues.add(q);
+        payload.enqueue(queues);
         
     }
 

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue May 27 05:43:04 2008
@@ -42,6 +42,7 @@
 
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.Collections;
 
 /**
  * Tests that acknowledgements are handled correctly.
@@ -145,7 +146,7 @@
             // we increment the reference here since we are not delivering the messaging to any queues, which is where
             // the reference is normally incremented. The test is easier to construct if we have direct access to the
             // subscription
-            msg.enqueue(_queue);
+            msg.enqueue(Collections.singleton(_queue));
             msg.routingComplete(_messageStore, factory);
             if(msg.allContentReceived())
             {

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=660490&r1=660489&r2=660490&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Tue May 27 05:43:04 2008
@@ -91,6 +91,11 @@
         return new Object();
     }
 
+    public void releaseSendLock()
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void resend(final QueueEntry entry)
     {
         //To change body of implemented methods use File | Settings | File Templates.