You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC

svn commit: r619823 [8/19] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Common...

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Feb  8 02:09:37 2008
@@ -55,7 +55,7 @@
                 defaultValue = "false")
     public boolean compressBufferOnQueue;
     /** Holds any queued messages */
-    private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+    private final MessageQueue<QueueEntry> _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
 
     /** Ensures that only one asynchronous task is running for this manager at any time. */
     private final AtomicBoolean _processing = new AtomicBoolean();
@@ -107,8 +107,9 @@
     }
 
 
-    private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
+    private boolean addMessageToQueue(QueueEntry entry, boolean deliverFirst)
     {
+        AMQMessage msg = entry.getMessage();
         // Shrink the ContentBodies to their actual size to save memory.
         if (compressBufferOnQueue)
         {
@@ -124,12 +125,12 @@
         {
             synchronized (_queueHeadLock)
             {
-                _messages.pushHead(msg);
+                _messages.pushHead(entry);
             }
         }
         else
         {
-            _messages.offer(msg);
+            _messages.offer(entry);
         }
 
         _totalMessageSize.addAndGet(msg.getSize());
@@ -175,11 +176,11 @@
 
     public long getOldestMessageArrival()
     {
-        AMQMessage msg = _messages.peek();
-        return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+        QueueEntry entry = _messages.peek();
+        return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
     }
 
-    public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+    public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry entry)
     {
         _lock.lock();
         try
@@ -188,19 +189,19 @@
             {
                 _log.debug("Queue has adding subscriber content");
                 _hasContent.add(subscription);
-                _totalMessageSize.addAndGet(msg.getSize());
+                _totalMessageSize.addAndGet(entry.getSize());
                 _extraMessages.addAndGet(1);
             }
             else
             {
                 _log.debug("Queue has removing subscriber content");
-                if (msg == null)
+                if (entry == null)
                 {
                     _hasContent.remove(subscription);
                 }
                 else
                 {
-                    _totalMessageSize.addAndGet(-msg.getSize());
+                    _totalMessageSize.addAndGet(-entry.getSize());
                     _extraMessages.addAndGet(-1);
                 }
             }
@@ -212,18 +213,48 @@
     }
 
     /**
+     *  NOTE : This method should only be called when there are no active subscribers
+     */
+    public void removeExpired() throws AMQException
+    {
+        _lock.lock();
+
+
+	    for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
+        {
+            QueueEntry entry = iter.next();
+            if(entry.expired())
+            {
+                // fixme: Currently we have to update the total byte size here for the data in the queue  
+                _totalMessageSize.addAndGet(-entry.getSize());
+                _queue.dequeue(_reapingStoreContext,entry);
+                iter.remove();
+            }
+	    }
+
+
+        _lock.unlock();
+    }
+
+    /** @return the state of the async processor. */
+    public boolean isProcessingAsync()
+    {
+        return _processing.get();
+    }
+
+    /**
      * Returns all the messages in the Queue
      *
      * @return List of messages
      */
-    public List<AMQMessage> getMessages()
+    public List<QueueEntry> getMessages()
     {
         _lock.lock();
-        List<AMQMessage> list = new ArrayList<AMQMessage>();
+        List<QueueEntry> list = new ArrayList<QueueEntry>();
 
-        for (AMQMessage message : _messages)
+        for (QueueEntry entry : _messages)
         {
-            list.add(message);
+            list.add(entry);
         }
         _lock.unlock();
 
@@ -238,7 +269,7 @@
      *
      * @return
      */
-    public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
+    public List<QueueEntry> getMessages(long fromMessageId, long toMessageId)
     {
         if (fromMessageId <= 0 || toMessageId <= 0)
         {
@@ -249,14 +280,14 @@
 
         _lock.lock();
 
-        List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+        List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
 
-        for (AMQMessage message : _messages)
+        for (QueueEntry entry : _messages)
         {
-            long msgId = message.getMessageId();
+            long msgId = entry.getMessage().getMessageId();
             if (msgId >= fromMessageId && msgId <= toMessageId)
             {
-                foundMessagesList.add(message);
+                foundMessagesList.add(entry);
             }
             // break if the no of messages are found
             if (foundMessagesList.size() == maxMessageCount)
@@ -276,22 +307,26 @@
             _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
         }
 
-        Iterator<AMQMessage> currentQueue = _messages.iterator();
+        Iterator<QueueEntry> currentQueue = _messages.iterator();
 
         while (currentQueue.hasNext())
         {
-            AMQMessage message = currentQueue.next();
-            if (subscription.hasInterest(message))
+            QueueEntry entry = currentQueue.next();
+
+            if (!entry.getDeliveredToConsumer())
             {
-                subscription.enqueueForPreDelivery(message, false);
+                if (subscription.hasInterest(entry)) // TGM: should be and'd
+                {
+                    subscription.enqueueForPreDelivery(entry, false);
+                }
             }
         }
     }
 
     public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
     {
-        AMQMessage msg = getNextMessage();
-        if (msg == null)
+        QueueEntry entry = getNextMessage();
+        if (entry == null)
         {
             return false;
         }
@@ -313,9 +348,9 @@
                 {
                     if (_log.isDebugEnabled())
                     {
-                        _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+                        _log.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
                     }
-                    _queue.dequeue(channel.getStoreContext(), msg);
+                    _queue.dequeue(channel.getStoreContext(), entry);
                 }
                 synchronized (channel)
                 {
@@ -323,17 +358,22 @@
 
                     if (acks)
                     {
-                        channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
+                        channel.addUnacknowledgedMessage(entry, deliveryTag, null);
                     }
 
-                    protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(),
+                    protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
                                                                             deliveryTag, _queue.getMessageCount());
-                    _totalMessageSize.addAndGet(-msg.getSize());
+                    _totalMessageSize.addAndGet(-entry.getSize());
+                }
+
+                if (!acks)
+                {
+                    entry.getMessage().decrementReference(channel.getStoreContext());
                 }
             }
             finally
             {
-                msg.setDeliveredToConsumer();
+                entry.setDeliveredToConsumer();
             }
             return true;
 
@@ -367,7 +407,7 @@
      *
      * @param messageList
      */
-    public void removeMovedMessages(List<AMQMessage> messageList)
+    public void removeMovedMessages(List<QueueEntry> messageList)
     {
         // Remove from the
         boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -377,20 +417,20 @@
             {
                 if (!sub.isSuspended() && sub.filtersMessages())
                 {
-                    Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
-                    for (AMQMessage msg : messageList)
+                    Queue<QueueEntry> preDeliveryQueue = sub.getPreDeliveryQueue();
+                    for (QueueEntry entry : messageList)
                     {
-                        preDeliveryQueue.remove(msg);
+                        preDeliveryQueue.remove(entry);
                     }
                 }
             }
         }
 
-        for (AMQMessage msg : messageList)
+        for (QueueEntry entry : messageList)
         {
-            if (_messages.remove(msg))
+            if (_messages.remove(entry))
             {
-                _totalMessageSize.getAndAdd(-msg.getSize());
+                _totalMessageSize.getAndAdd(-entry.getSize());
             }
         }
     }
@@ -406,10 +446,16 @@
     {
         _lock.lock();
 
-        AMQMessage message = _messages.poll();
-        if (message != null)
+        QueueEntry entry = _messages.poll();
+        if (entry != null)
         {
-            _totalMessageSize.addAndGet(-message.getSize());
+            _queue.dequeue(storeContext, entry);
+
+            _totalMessageSize.addAndGet(-entry.getSize());
+
+            //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
+            entry.getMessage().decrementReference(storeContext);
+
         }
 
         _lock.unlock();
@@ -422,14 +468,16 @@
 
         synchronized (_queueHeadLock)
         {
-            AMQMessage msg = getNextMessage();
-            while (msg != null)
+            QueueEntry entry = getNextMessage();
+            while (entry != null)
             {
                 //and remove it
                 _messages.poll();
 
-                _queue.dequeue(storeContext, msg);
-                msg = getNextMessage();
+                _queue.dequeue(storeContext, entry);
+                entry.getMessage().decrementReference(_reapingStoreContext);
+
+                entry = getNextMessage();
                 count++;
             }
             _totalMessageSize.set(0L);
@@ -445,34 +493,35 @@
      *
      * @throws org.apache.qpid.AMQException
      */
-    private AMQMessage getNextMessage() throws AMQException
+    private QueueEntry getNextMessage() throws AMQException
     {
-        return getNextMessage(_messages, null);
+        return getNextMessage(_messages, null, false);
     }
 
-    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException
+    private QueueEntry getNextMessage(Queue<QueueEntry> messages, Subscription sub, boolean purgeOnly) throws AMQException
     {
-        AMQMessage message = messages.peek();
+        QueueEntry entry = messages.peek();
 
         //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
-        while (purgeMessage(message, sub))
+        while (purgeMessage(entry, sub, purgeOnly))
         {
+            AMQMessage message = entry.getMessage();
             // if we are purging then ensure we mark this message taken for the current subscriber
             // the current subscriber may be null in the case of a get or a purge but this is ok.
 //            boolean alreadyTaken = message.taken(_queue, sub);
 
             //remove the already taken message or expired
-            AMQMessage removed = messages.poll();
+            QueueEntry removed = messages.poll();
 
-            assert removed == message;
+            assert removed == entry;
 
             // if the message expired then the _totalMessageSize needs adjusting
-            if (message.expired(_queue))
+            if (message.expired(_queue) && !entry.getDeliveredToConsumer())
             {
-                _totalMessageSize.addAndGet(-message.getSize());
+                _totalMessageSize.addAndGet(-entry.getSize());
 
                 // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
-                message.dequeue(_reapingStoreContext, _queue);
+                _queue.dequeue(_reapingStoreContext, entry);
 
                 if (_log.isInfoEnabled())
                 {
@@ -489,10 +538,10 @@
             }
 
             // try the next message
-            message = messages.peek();
+            entry = messages.peek();
         }
 
-        return message;
+        return entry;
     }
 
     /**
@@ -505,7 +554,25 @@
      * @return
      * @throws AMQException
      */
-    private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+    private boolean purgeMessage(QueueEntry message, Subscription sub) throws AMQException
+    {
+        return purgeMessage(message, sub, false);
+    }
+
+    /**
+     * This method will return true if the message is to be purged from the queue.
+     * \
+     * SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the current Queue(_queue) when purgeOnly is false
+     *
+     * @param message
+     * @param sub
+     * @param purgeOnly When set to false the message will be taken by the given Subscription.
+     *
+     * @return if the msg should be purged
+     *
+     * @throws AMQException
+     */
+    private boolean purgeMessage(QueueEntry message, Subscription sub, boolean purgeOnly) throws AMQException
     {
         //Original.. complicated while loop control
 //                (message != null
@@ -520,7 +587,7 @@
         if (message != null)
         {
             // Check that the message hasn't expired.
-            if (message.expired(_queue))
+            if (message.expired())
             {
                 return true;
             }
@@ -529,27 +596,36 @@
             if (sub != null)
             {
                 // if we have a queue browser(we don't purge) so check mark the message as taken
-                purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+                purge = ((!sub.isBrowser() || message.isTaken()));
             }
             else
             {
                 // if there is no subscription we are doing
                 // a get or purging so mark message as taken.
-                message.isTaken(_queue);
+                message.isTaken();
                 // and then ensure that it gets purged
                 purge = true;
             }
         }
 
-        // if we are purging then ensure we mark this message taken for the current subscriber
-        // the current subscriber may be null in the case of a get or a purge but this is ok.
-        return purge && message.taken(_queue, sub);
+        if (purgeOnly)
+        {
+            // If we are simply purging the queue don't take the message
+            // just purge up to the next non-taken msg.
+            return purge && message.isTaken();
+        }
+        else
+        {
+            // if we are purging then ensure we mark this message taken for the current subscriber
+            // the current subscriber may be null in the case of a get or a purge but this is ok.
+            return purge && message.taken(sub);
+        }
     }
 
-    public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
+    public void sendNextMessage(Subscription sub, AMQQueue queue)
     {
 
-        Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+        Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
 
         if (_log.isTraceEnabled())
         {
@@ -568,16 +644,16 @@
             return;
         }
 
-        AMQMessage message = null;
-        AMQMessage removed = null;
+        QueueEntry entry = null;
+        QueueEntry removed = null;
         try
         {
             synchronized (_queueHeadLock)
             {
-                message = getNextMessage(messageQueue, sub);
+                entry = getNextMessage(messageQueue, sub, false);
 
                 // message will be null if we have no messages in the messageQueue.
-                if (message == null)
+                if (entry == null)
                 {
                     if (_log.isTraceEnabled())
                     {
@@ -587,12 +663,17 @@
                 }
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) +
+                    _log.debug(debugIdentity() + "Async Delivery Message :" + entry + "(" + System.identityHashCode(entry) +
                                ") by :" + System.identityHashCode(this) +
                                ") to :" + System.identityHashCode(sub));
                 }
 
-                sub.send(message, _queue);
+                if (messageQueue == _messages)
+                {
+                    _totalMessageSize.addAndGet(-entry.getSize());
+                }
+
+                sub.send(entry, _queue);
 
                 //remove sent message from our queue.
                 removed = messageQueue.poll();
@@ -600,14 +681,14 @@
                 // Otherwise the Async send will never end
             }
 
-            if (removed != message)
+            if (removed != entry)
             {
-                _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed);
+                _log.error("Just send message:" + entry.getMessage().debugIdentity() + " BUT removed this from queue:" + removed);
             }
 
             if (_log.isDebugEnabled())
             {
-                _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message +
+                _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.getMessage().debugIdentity() + "d:" + entry +
                            ") by :" + System.identityHashCode(this) +
                            ") to :" + System.identityHashCode(sub));
             }
@@ -639,16 +720,12 @@
                 }
             }
 
-            if ((message != null) && (messageQueue == _messages))
-            {
-                _totalMessageSize.addAndGet(-message.getSize());
-            }
         }
         catch (AMQException e)
         {
-            if (message != null)
+            if (entry != null)
             {
-                message.release(_queue);
+                entry.release();
             }
             else
             {
@@ -664,23 +741,23 @@
      * @param storeContext
      * @param movedMessageList
      */
-    public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList)
+    public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList)
     {
         _lock.lock();
-        for (AMQMessage msg : movedMessageList)
+        for (QueueEntry entry : movedMessageList)
         {
-            addMessageToQueue(msg, false);
+            addMessageToQueue(entry, false);
         }
 
         // enqueue on the pre delivery queues
         for (Subscription sub : _subscriptions.getSubscriptions())
         {
-            for (AMQMessage msg : movedMessageList)
+            for (QueueEntry entry : movedMessageList)
             {
                 // Only give the message to those that want them.
-                if (sub.hasInterest(msg))
+                if (sub.hasInterest(entry))
                 {
-                    sub.enqueueForPreDelivery(msg, true);
+                    sub.enqueueForPreDelivery(entry, true);
                 }
             }
         }
@@ -732,30 +809,30 @@
 
     }
 
-    public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
+    public void deliver(StoreContext context, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws AMQException
     {
 
         final boolean debugEnabled = _log.isDebugEnabled();
         if (debugEnabled)
         {
-            _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
+            _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + entry);
         }
 
         //Check if we have someone to deliver the message to.
         _lock.lock();
         try
         {
-            Subscription s = _subscriptions.nextSubscriber(msg);
+            Subscription s = _subscriptions.nextSubscriber(entry);
 
             if (s == null || hasQueuedMessages()) //no-one can take the message right now or we're queueing
             {
                 if (debugEnabled)
                 {
-                    _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
+                    _log.debug(debugIdentity() + "Testing Message(" + entry + ") for Queued Delivery:" + currentStatus());
                 }
-                if (!msg.getMessagePublishInfo().isImmediate())
+                if (!entry.getMessage().getMessagePublishInfo().isImmediate())
                 {
-                    addMessageToQueue(msg, deliverFirst);
+                    addMessageToQueue(entry, deliverFirst);
 
                     //release lock now message is on queue.
                     _lock.unlock();
@@ -770,25 +847,25 @@
                     {
 
                         // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
-                        if (_queue.isShared() && msg.getDeliveredToConsumer())
+                        if (_queue.isShared() && entry.getDeliveredToConsumer())
                         {
                             if (debugEnabled)
                             {
-                                _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+                                _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(entry) +
                                            ") is already delivered.");
                             }
                             continue;
                         }
 
                         // Only give the message to those that want them.
-                        if (sub.hasInterest(msg))
+                        if (sub.hasInterest(entry))
                         {
                             if (debugEnabled)
                             {
-                                _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) +
+                                _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(entry) +
                                            ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
                             }
-                            sub.enqueueForPreDelivery(msg, deliverFirst);
+                            sub.enqueueForPreDelivery(entry, deliverFirst);
                         }
                     }
                 }
@@ -809,38 +886,67 @@
                     {
                         if (_log.isTraceEnabled())
                         {
-                            _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
+                            _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
-                        msg.taken(_queue, s);
+
+                        if (entry.taken(s))
+                        {
+                            //Message has been delivered so don't redeliver.
+                            // This can currently occur because of the recursive call below
+                            // During unit tests the send can occur
+                            // client then rejects
+                            // this reject then releases the message by the time the
+                            // if(!msg.isTaken()) call is made below
+                            // the message has been released so that thread loops to send the message again
+                            // of course by the time it gets back to here. the thread that released the
+                            // message is now ready to send it. Here is a sample trace for reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145 Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]}
+                            // Note: In the last request to take the message from thread 4,5 the message has been
+                            // taken by the previous call done by thread 2,5
+
+
+                            return;
+                        }
                         //Deliver the message
-                        s.send(msg, _queue);
+                        s.send(entry, _queue);
                     }
                     else
                     {
                         if (debugEnabled)
                         {
                             _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
-                                       "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+                                       "suspended between nextSubscriber and send for message:" + entry.getMessage().debugIdentity());
                         }
                     }
                 }
 
-                if (!msg.isTaken(_queue))
+                if (!entry.isTaken())
                 {
                     if (debugEnabled)
                     {
-                        _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
+                        _log.debug(debugIdentity() + " Message(" + entry.getMessage().debugIdentity() + ") has not been taken so recursing!:" +
                                    " Subscriber:" + System.identityHashCode(s));
                     }
 
-                    deliver(context, name, msg, deliverFirst);
+                    deliver(context, name, entry, deliverFirst);
                 }
                 else
                 {
                     if (debugEnabled)
                     {
-                        _log.debug(debugIdentity() + " Message(" + msg.toString() +
+                        _log.debug(debugIdentity() + " Message(" + entry.toString() +
                                    ") has been taken so disregarding deliver request to Subscriber:" +
                                    System.identityHashCode(s));
                     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Feb  8 02:09:37 2008
@@ -64,13 +64,13 @@
      *
      * @param storeContext
      * @param name         the name of the entity on whose behalf we are delivering the message
-     * @param msg          the message to deliver
+     * @param entry          the message to deliver
      * @param deliverFirst
      *
      * @throws org.apache.qpid.server.queue.FailedDequeueException
      *          if the message could not be dequeued
      */
-    void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
+    void deliver(StoreContext storeContext, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws FailedDequeueException, AMQException;
 
     void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
 
@@ -78,15 +78,15 @@
 
     void startMovingMessages();
 
-    void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList);
+    void enqueueMovedMessages(StoreContext context, List<QueueEntry> messageList);
 
     void stopMovingMessages();
 
-    void removeMovedMessages(List<AMQMessage> messageListToRemove);
+    void removeMovedMessages(List<QueueEntry> messageListToRemove);
 
-    List<AMQMessage> getMessages();
+    List<QueueEntry> getMessages();
 
-    List<AMQMessage> getMessages(long fromMessageId, long toMessageId);
+    List<QueueEntry> getMessages(long fromMessageId, long toMessageId);
 
     void populatePreDeliveryQueue(Subscription subscription);
 
@@ -96,5 +96,7 @@
 
     long getOldestMessageArrival();
 
-    void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg);
+    void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg);
+
+    void removeExpired() throws AMQException;
 }

Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.log4j.Logger;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public class QueueEntry
+{
+
+    /**
+     * Used for debugging purposes.
+     */
+    private static final Logger _log = Logger.getLogger(QueueEntry.class);
+
+    private final AMQQueue _queue;
+    private final AMQMessage _message;
+
+    private Set<Subscription> _rejectedBy = null;
+
+    private AtomicReference<Object> _owner = new AtomicReference<Object>();
+
+
+    public QueueEntry(AMQQueue queue, AMQMessage message)
+    {
+        _queue = queue;
+        _message = message;
+    }
+
+
+    public AMQQueue getQueue()
+    {
+        return _queue;
+    }
+
+    public AMQMessage getMessage()
+    {
+        return _message;
+    }
+
+    public long getSize()
+    {
+        return getMessage().getSize();
+    }
+
+    public boolean getDeliveredToConsumer()
+    {
+        return getMessage().getDeliveredToConsumer();
+    }
+
+    public boolean expired() throws AMQException
+    {
+        return getMessage().expired(_queue);
+    }
+
+    public boolean isTaken()
+    {
+        return _owner.get() != null;
+    }
+
+    public boolean taken(Subscription sub)
+    {
+        return !(_owner.compareAndSet(null, sub == null ? this : sub));
+    }
+
+    public void setDeliveredToConsumer()
+    {
+        getMessage().setDeliveredToConsumer();
+    }
+
+    public void release()
+    {
+        _owner.set(null);
+    }
+
+    public String debugIdentity()
+    {
+        return getMessage().debugIdentity();
+    }
+
+    public void process(StoreContext storeContext, boolean deliverFirst) throws AMQException
+    {
+        _queue.process(storeContext, this, deliverFirst);
+    }
+
+    public void checkDeliveredToConsumer() throws NoConsumersException
+    {
+        _message.checkDeliveredToConsumer();
+    }
+
+    public void setRedelivered(boolean b)
+    {
+        getMessage().setRedelivered(b);
+    }
+
+    public Subscription getDeliveredSubscription()
+    {
+        synchronized (this)
+        {
+            Object owner = _owner.get();
+            if (owner instanceof Subscription)
+            {
+                return (Subscription) owner;
+            }
+            else
+            {
+                return null;
+            }
+        }
+    }
+
+    public void reject()
+    {
+        reject(getDeliveredSubscription());
+    }
+
+    public void reject(Subscription subscription)
+    {
+        if (subscription != null)
+        {
+            if (_rejectedBy == null)
+            {
+                _rejectedBy = new HashSet<Subscription>();
+            }
+
+            _rejectedBy.add(subscription);
+        }
+        else
+        {
+            _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+        }
+    }
+
+    public boolean isRejectedBy(Subscription subscription)
+    {
+        boolean rejected = _rejectedBy != null;
+
+        if (rejected) // We have subscriptions that rejected this message
+        {
+            return _rejectedBy.contains(subscription);
+        }
+        else // This messasge hasn't been rejected yet.
+        {
+            return rejected;
+        }
+    }
+
+
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Fri Feb  8 02:09:37 2008
@@ -27,7 +27,7 @@
 
 public interface Subscription
 {
-    void send(AMQMessage msg, AMQQueue queue) throws AMQException;
+    void send(QueueEntry msg, AMQQueue queue) throws AMQException;
 
     boolean isSuspended();
 
@@ -35,15 +35,15 @@
 
     boolean filtersMessages();
 
-    boolean hasInterest(AMQMessage msg);
+    boolean hasInterest(QueueEntry msg);
 
-    Queue<AMQMessage> getPreDeliveryQueue();
+    Queue<QueueEntry> getPreDeliveryQueue();
 
-    Queue<AMQMessage> getResendQueue();
+    Queue<QueueEntry> getResendQueue();
 
-    Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);    
+    Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages);
 
-    void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst);
+    void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
 
     boolean isAutoClose();
 
@@ -53,9 +53,9 @@
 
     boolean isBrowser();
 
-    boolean wouldSuspend(AMQMessage msg);
+    boolean wouldSuspend(QueueEntry msg);
 
-    void addToResendQueue(AMQMessage msg);
+    void addToResendQueue(QueueEntry msg);
 
     Object getSendLock();
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Feb  8 02:09:37 2008
@@ -58,9 +58,9 @@
 
     private final Object _sessionKey;
 
-    private MessageQueue<AMQMessage> _messages;
+    private MessageQueue<QueueEntry> _messages;
 
-    private Queue<AMQMessage> _resendQueue;
+    private Queue<QueueEntry> _resendQueue;
 
     private final boolean _noLocal;
 
@@ -160,7 +160,7 @@
 
         if (filtersMessages())
         {
-            _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+            _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
         }
         else
         {
@@ -226,7 +226,7 @@
      *
      * @throws AMQException
      */
-    public void send(AMQMessage msg, AMQQueue queue) throws AMQException
+    public void send(QueueEntry msg, AMQQueue queue) throws AMQException
     {
         if (msg != null)
         {
@@ -245,7 +245,7 @@
         }
     }
 
-    private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
+    private void sendToBrowser(QueueEntry msg, AMQQueue queue) throws AMQException
     {
         // We don't decrement the reference here as we don't want to consume the message
         // but we do want to send it to the client.
@@ -266,11 +266,11 @@
                 _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
             }
 
-            protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+            protocolSession.getProtocolOutputConverter().writeDeliver(msg.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
         }
     }
 
-    private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
+    private void sendToConsumer(StoreContext storeContext, QueueEntry entry, AMQQueue queue)
             throws AMQException
     {
         try
@@ -287,9 +287,9 @@
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+                    _logger.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
                 }
-                queue.dequeue(storeContext, msg);
+                queue.dequeue(storeContext, entry);
             }
 
             synchronized (channel)
@@ -298,16 +298,20 @@
 
                 if (_sendLock.get())
                 {
-                    _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+                    _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
                 }
 
                 if (_acks)
                 {
-                    channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+                    channel.addUnacknowledgedMessage(entry, deliveryTag, consumerTag);
                 }
 
-                protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+                protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
 
+                if (!_acks)
+                {
+                    entry.getMessage().decrementReference(storeContext);
+                }
             }
         }
         finally
@@ -316,7 +320,7 @@
             // using a try->finally would set it even if an error occured.
             // Is this what we want? 
 
-            msg.setDeliveredToConsumer();
+            entry.setDeliveredToConsumer();
         }
     }
 
@@ -351,19 +355,19 @@
         return _filters != null || _noLocal;
     }
 
-    public boolean hasInterest(AMQMessage msg)
+    public boolean hasInterest(QueueEntry entry)
     {
         //check that the message hasn't been rejected
-        if (msg.isRejectedBy(this))
+        if (entry.isRejectedBy(this))
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity());
+                _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
             }
 //            return false;
         }
 
-        final AMQProtocolSession publisher = msg.getPublisher();
+        final AMQProtocolSession publisher = entry.getMessage().getPublisher();
 
         //todo - client id should be recoreded and this test removed but handled below
         if (_noLocal && publisher != null)
@@ -414,9 +418,9 @@
 
         if (_logger.isTraceEnabled())
         {
-            _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity());
+            _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
         }
-        return checkFilters(msg);
+        return checkFilters(entry);
 
     }
 
@@ -427,7 +431,7 @@
         return id;
     }
 
-    private boolean checkFilters(AMQMessage msg)
+    private boolean checkFilters(QueueEntry msg)
     {
         if (_filters != null)
         {
@@ -435,7 +439,7 @@
 //            {
 //                _logger.trace("(" + debugIdentity() + ") has filters.");
 //            }
-            return _filters.allAllow(msg);
+            return _filters.allAllow(msg.getMessage());
         }
         else
         {
@@ -448,12 +452,12 @@
         }
     }
 
-    public Queue<AMQMessage> getPreDeliveryQueue()
+    public Queue<QueueEntry> getPreDeliveryQueue()
     {
         return _messages;
     }
 
-    public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
+    public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
     {
         if (_messages != null)
         {
@@ -557,19 +561,19 @@
 
             while (!_resendQueue.isEmpty())
             {
-                AMQMessage resent = _resendQueue.poll();
+                QueueEntry resent = _resendQueue.poll();
 
                 if (_logger.isTraceEnabled())
                 {
                     _logger.trace("Removed for resending:" + resent.debugIdentity());
                 }
 
-                resent.release(_queue);
+                resent.release();
                 _queue.subscriberHasPendingResend(false, this, resent);
 
                 try
                 {
-                    channel.getTransactionalContext().deliver(resent, _queue, true);
+                    channel.getTransactionalContext().deliver(resent, true);
                 }
                 catch (AMQException e)
                 {
@@ -607,22 +611,22 @@
         return _isBrowser;
     }
 
-    public boolean wouldSuspend(AMQMessage msg)
+    public boolean wouldSuspend(QueueEntry msg)
     {
-        return channel.wouldSuspend(msg);
+        return channel.wouldSuspend(msg.getMessage());
     }
 
-    public Queue<AMQMessage> getResendQueue()
+    public Queue<QueueEntry> getResendQueue()
     {
         if (_resendQueue == null)
         {
-            _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+            _resendQueue = new ConcurrentLinkedQueueAtomicSize<QueueEntry>();
         }
         return _resendQueue;
     }
 
 
-    public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+    public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
     {
         if (_resendQueue != null && !_resendQueue.isEmpty())
         {
@@ -647,7 +651,7 @@
         }
     }
 
-    public void addToResendQueue(AMQMessage msg)
+    public void addToResendQueue(QueueEntry msg)
     {
         // add to our resend queue
         getResendQueue().add(msg);

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Fri Feb  8 02:09:37 2008
@@ -30,5 +30,5 @@
 {
     public List<Subscription> getSubscriptions();
     public boolean hasActiveSubscribers();
-    public Subscription nextSubscriber(AMQMessage msg);
+    public Subscription nextSubscriber(QueueEntry entry);
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Fri Feb  8 02:09:37 2008
@@ -37,7 +37,8 @@
 
     /** Used to control the round robin delivery of content */
     private int _currentSubscriber;
-    private final Object _subscriptionsChange = new Object();
+
+    private final Object _changeLock = new Object();
 
 
     /** Accessor for unit tests. */
@@ -48,7 +49,7 @@
 
     public void addSubscriber(Subscription subscription)
     {
-        synchronized (_subscriptionsChange)
+        synchronized (_changeLock)
         {
             _subscriptions.add(subscription);
         }
@@ -66,7 +67,7 @@
         // TODO: possibly need O(1) operation here.
 
         Subscription sub = null;
-        synchronized (_subscriptionsChange)
+        synchronized (_changeLock)
         {
             int subIndex = _subscriptions.indexOf(subscription);
 
@@ -113,7 +114,7 @@
      * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
      * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
      */
-    public Subscription nextSubscriber(AMQMessage msg)
+    public Subscription nextSubscriber(QueueEntry msg)
     {
         if (_subscriptions.isEmpty())
         {
@@ -140,7 +141,7 @@
         }
     }
 
-    private Subscription nextSubscriberImpl(AMQMessage msg)
+    private Subscription nextSubscriberImpl(QueueEntry msg)
     {
         final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
         while (iterator.hasNext())
@@ -226,4 +227,11 @@
     {
         return _subscriptions.size();
     }
+
+
+    public Object getChangeLock()
+    {
+        return _changeLock;
+    }
+    
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Fri Feb  8 02:09:37 2008
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,15 +20,14 @@
  */
 package org.apache.qpid.server.registry;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * An abstract application registry that provides access to configuration information and handles the
  * construction and caching of configurable objects.
@@ -59,24 +58,7 @@
         public void run()
         {
             _logger.info("Shutting down application registries...");
-            try
-            {
-                synchronized (_instanceMap)
-                {
-                    Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator();
-
-                    while (keyIterator.hasNext())
-                    {
-                        IApplicationRegistry instance = keyIterator.next();
-
-                        instance.close();
-                    }
-                }
-            }
-            catch (Exception e)
-            {
-                _logger.error("Error shutting down message store: " + e, e);
-            }
+            removeAll();
         }
     }
 
@@ -121,6 +103,7 @@
             }
             catch (Exception e)
             {
+            _logger.error("Error shutting down message store: " + e, e);
 
             }
             finally
@@ -130,6 +113,14 @@
         }
     }
 
+    public static void removeAll()
+    {
+        Object[] keys = _instanceMap.keySet().toArray();
+        for (Object k : keys)
+        {
+            remove((Integer) k);
+        }
+    }
 
     protected ApplicationRegistry(Configuration configuration)
     {
@@ -173,7 +164,7 @@
 
     public void close() throws Exception
     {
-        for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
+        for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
         {
             virtualHost.close();
         }
@@ -210,7 +201,6 @@
         return instance;
     }
 
-    
 
     public static void setDefaultApplicationRegistry(String clazz)
     {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Fri Feb  8 02:09:37 2008
@@ -34,6 +34,7 @@
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.management.ManagementConfiguration;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
@@ -60,6 +61,8 @@
 
     private final Map<String, VirtualHost> _virtualHosts = new ConcurrentHashMap<String, VirtualHost>();
 
+    private PluginManager _pluginManager;
+
 
     public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
     {
@@ -117,6 +120,8 @@
 
         _managedObjectRegistry.start();
 
+        _pluginManager = new PluginManager(_configuration.getString("plugin-directory"));
+        
         initialiseVirtualHosts();
 
     }
@@ -172,5 +177,10 @@
     public Collection<String> getVirtualHostNames()
     {
         return getConfiguration().getList("virtualhosts.virtualhost.name");
+    }
+    
+    public PluginManager getPluginManager()
+    {
+        return _pluginManager;
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Fri Feb  8 02:09:37 2008
@@ -24,6 +24,7 @@
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.access.AccessManager;
@@ -68,4 +69,7 @@
     VirtualHostRegistry getVirtualHostRegistry();
 
     AccessManager getAccessManager();
+    
+    PluginManager getPluginManager();
+    
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Feb  8 02:09:37 2008
@@ -28,37 +28,11 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicGetBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.handler.BasicAckMethodHandler;
 import org.apache.qpid.server.handler.BasicCancelMethodHandler;
 import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
@@ -107,43 +81,35 @@
      * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
      * AMQFrame.
      */
-    private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
+/*    private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
         new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(
             AMQState.class);
+  */
+
 
     private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
 
     public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession);
-    }
 
-    protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry,
-        AMQProtocolSession protocolSession)
-    {
         _virtualHostRegistry = virtualHostRegistry;
         _protocolSession = protocolSession;
-        _currentState = initial;
-        if (register)
-        {
-            registerListeners();
-        }
+        _currentState = AMQState.CONNECTION_NOT_STARTED;
+
     }
 
+    /*
     protected void registerListeners()
     {
         Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap;
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance());
         _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ConnectionSecureOkBody.class, ConnectionSecureOkMethodHandler.getInstance());
         _state2HandlersMap.put(AMQState.CONNECTION_NOT_AUTH, frame2handlerMap);
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ConnectionTuneOkBody.class, ConnectionTuneOkMethodHandler.getInstance());
         _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
@@ -154,37 +120,41 @@
         // ConnectionOpen handlers
         //
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ChannelOpenBody.class, ChannelOpenHandler.getInstance());
-        frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseHandler.getInstance());
-        frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance());
-        frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance());
-        frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance());
-        frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
-        frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
-        frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());
-        frame2handlerMap.put(BasicGetBody.class, BasicGetMethodHandler.getInstance());
-        frame2handlerMap.put(BasicCancelBody.class, BasicCancelMethodHandler.getInstance());
-        frame2handlerMap.put(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
-        frame2handlerMap.put(BasicQosBody.class, BasicQosHandler.getInstance());
-        frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance());
-        frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance());
-        frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance());
-        frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance());
-        frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
-        frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance());
-        frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance());
-        frame2handlerMap.put(TxRollbackBody.class, TxRollbackHandler.getInstance());
-        frame2handlerMap.put(BasicRejectBody.class, BasicRejectMethodHandler.getInstance());
+        ChannelOpenHandler.getInstance();
+        ChannelCloseHandler.getInstance();
+        ChannelCloseOkHandler.getInstance();
+        ConnectionCloseMethodHandler.getInstance();
+        ConnectionCloseOkMethodHandler.getInstance();
+        ConnectionTuneOkMethodHandler.getInstance();
+        ConnectionSecureOkMethodHandler.getInstance();
+        ConnectionStartOkMethodHandler.getInstance();
+        ExchangeDeclareHandler.getInstance();
+        ExchangeDeleteHandler.getInstance();
+        ExchangeBoundHandler.getInstance();
+        BasicAckMethodHandler.getInstance();
+        BasicRecoverMethodHandler.getInstance();
+        BasicConsumeMethodHandler.getInstance();
+        BasicGetMethodHandler.getInstance();
+        BasicCancelMethodHandler.getInstance();
+        BasicPublishMethodHandler.getInstance();
+        BasicQosHandler.getInstance();
+        QueueBindHandler.getInstance();
+        QueueDeclareHandler.getInstance();
+        QueueDeleteHandler.getInstance();
+        QueuePurgeHandler.getInstance();
+        ChannelFlowHandler.getInstance();
+        TxSelectHandler.getInstance();
+        TxCommitHandler.getInstance();
+        TxRollbackHandler.getInstance();
+        BasicRejectMethodHandler.getInstance();
 
         _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ConnectionCloseOkBody.class, ConnectionCloseOkMethodHandler.getInstance());
+
         _state2HandlersMap.put(AMQState.CONNECTION_CLOSING, frame2handlerMap);
 
-    }
+    } */
 
     public AMQState getCurrentState()
     {
@@ -214,18 +184,25 @@
 
     public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
     {
-        StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
-        if (handler != null)
-        {
+        MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher();
 
-            checkChannel(evt, _protocolSession);
+        final int channelId = evt.getChannelId();
+        B body = evt.getMethod();
 
-            handler.methodReceived(this, evt);
+        if(channelId != 0 && _protocolSession.getChannel(channelId)== null)
+        {
+
+            if(! ((body instanceof ChannelOpenBody)
+                  || (body instanceof ChannelCloseOkBody)
+                  || (body instanceof ChannelCloseBody)))
+            {
+                throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed");
+            }
 
-            return true;
         }
 
-        return false;
+        return body.execute(dispatcher, channelId);
+
     }
 
     private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
@@ -239,6 +216,7 @@
         }
     }
 
+/*
     protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
         B frame)
     // throws IllegalStateTransitionException
@@ -260,6 +238,7 @@
             return handler;
         }
     }
+*/
 
     public void addStateListener(StateListener listener)
     {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java Fri Feb  8 02:09:37 2008
@@ -29,7 +29,7 @@
  * the opportunity to update state.
  *
  */
-public interface StateAwareMethodListener <B extends AMQMethodBody>
+public interface StateAwareMethodListener<B extends AMQMethodBody>
 {
-    void methodReceived(AMQStateManager stateManager,  AMQMethodEvent<B> evt) throws AMQException;
+    void methodReceived(AMQStateManager stateManager,  B evt, int channelId) throws AMQException;
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Feb  8 02:09:37 2008
@@ -20,27 +20,26 @@
  */
 package org.apache.qpid.server.store;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
 
-/**
- * A simple message store that stores the messages in a threadsafe structure in memory.
- */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A simple message store that stores the messages in a threadsafe structure in memory. */
 public class MemoryMessageStore implements MessageStore
 {
     private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -54,6 +53,7 @@
     protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
 
     private final AtomicLong _messageId = new AtomicLong(1);
+    private AtomicBoolean _closed = new AtomicBoolean(false);
 
     public void configure()
     {
@@ -77,6 +77,7 @@
 
     public void close() throws Exception
     {
+        _closed.getAndSet(true);
         if (_metaDataMap != null)
         {
             _metaDataMap.clear();
@@ -89,8 +90,9 @@
         }
     }
 
-    public void removeMessage(StoreContext context, Long messageId)
+    public void removeMessage(StoreContext context, Long messageId) throws AMQException
     {
+        checkNotClosed();
         if (_log.isDebugEnabled())
         {
             _log.debug("Removing message with id " + messageId);
@@ -172,9 +174,10 @@
     public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
             throws AMQException
     {
+        checkNotClosed();
         List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
 
-        if(bodyList == null && lastContentBody)
+        if (bodyList == null && lastContentBody)
         {
             _contentBodyMap.put(messageId, Collections.singletonList(contentBody));
         }
@@ -193,17 +196,28 @@
     public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
             throws AMQException
     {
+        checkNotClosed();
         _metaDataMap.put(messageId, messageMetaData);
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
     {
+        checkNotClosed();
         return _metaDataMap.get(messageId);
     }
 
     public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
     {
+        checkNotClosed();
         List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
         return bodyList.get(index);
+    }
+
+     private void checkNotClosed() throws MessageStoreClosedException
+     {
+        if (_closed.get())
+        {
+            throw new MessageStoreClosedException();
+        }
     }
 }

Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,36 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.AMQException;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * NOTE: this class currently extends AMQException but
+ * we should be using AMQExceptions internally in the code base for Protocol errors hence
+ * the message store interface should throw a different super class which this should be
+ * moved to reflect
+ */
+public class MessageStoreClosedException extends AMQException
+{
+    public MessageStoreClosedException()
+    {
+        super(null, "Message store closed", null);
+    }
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java Fri Feb  8 02:09:37 2008
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,9 +23,12 @@
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.util.NewThreadExecutor;
 import org.apache.qpid.configuration.Configured;
+import org.apache.log4j.Logger;
 
 public class ConnectorConfiguration
 {
+    private static final Logger _logger = Logger.getLogger(ConnectorConfiguration.class);
+
     public static final String DEFAULT_PORT = "5672";
 
     public static final String SSL_PORT = "8672";
@@ -41,7 +44,7 @@
     @Configured(path = "connector.bind",
                 defaultValue = "wildcard")
     public String bindAddress;
-    
+
     @Configured(path = "connector.socketReceiveBuffer",
                 defaultValue = "32767")
     public int socketReceiveBufferSize;
@@ -69,29 +72,43 @@
     @Configured(path = "connector.ssl.enabled",
                 defaultValue = "false")
     public boolean enableSSL;
-    
+
     @Configured(path = "connector.ssl.sslOnly",
-    		    defaultValue = "true")
+                defaultValue = "true")
     public boolean sslOnly;
-    
+
     @Configured(path = "connector.ssl.port",
-            defaultValue = SSL_PORT)
-    public int sslPort;    
-    
+                defaultValue = SSL_PORT)
+    public int sslPort;
+
     @Configured(path = "connector.ssl.keystorePath",
-    			defaultValue = "none")
+                defaultValue = "none")
     public String keystorePath;
-    
+
     @Configured(path = "connector.ssl.keystorePassword",
-    			defaultValue = "none")
+                defaultValue = "none")
     public String keystorePassword;
-    
+
     @Configured(path = "connector.ssl.certType",
-    			defaultValue = "SunX509")
+                defaultValue = "SunX509")
     public String certType;
 
+    @Configured(path = "connector.qpidnio",
+                defaultValue = "true")
+    public boolean _multiThreadNIO;
+
+
     public IoAcceptor createAcceptor()
     {
-        return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());     
+        if (_multiThreadNIO)
+        {
+            _logger.warn("Using Qpid Multithreaded IO Processing");
+            return new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(processors, new NewThreadExecutor());
+        }
+        else
+        {
+            _logger.warn("Using Mina IO Processing");
+            return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());
+        }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java Fri Feb  8 02:09:37 2008
@@ -20,6 +20,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -138,16 +139,14 @@
         // The message is now fully received, we can stage it before enqueued if necessary
     }
 
-    public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst)
+    public void deliver(QueueEntry entry, boolean deliverFirst)
             throws
             AMQException
     {
         try
         {
-            //The message has been delivered to the queues
-            message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
             // add a record in the transaction
-            _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, message, queue, deliverFirst));
+            _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, entry, deliverFirst));
         } catch (Exception e)
         {
             throw new AMQException(null, "Problem during transaction rollback", e);
@@ -177,7 +176,7 @@
                     {
                         if (_log.isDebugEnabled())
                         {
-                            _log.debug("Discarding message: " + message.message.getMessageId());
+                            _log.debug("Discarding message: " + message.entry.debugIdentity());
                         }
 
                         //Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -203,7 +202,7 @@
                 {
                     if (_log.isDebugEnabled())
                     {
-                        _log.debug("Discarding message: " + msg.message.getMessageId());
+                        _log.debug("Discarding message: " + msg.entry.debugIdentity());
                     }
                     //Message has been ack so discard it. This will dequeue and decrement the reference.
                     dequeue(msg);
@@ -222,7 +221,7 @@
 
             if (_log.isDebugEnabled())
             {
-                _log.debug("Discarding message: " + msg.message.getMessageId());
+                _log.debug("Discarding message: " + msg.entry.debugIdentity());
             }
 
             //Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -231,7 +230,7 @@
             if (_log.isDebugEnabled())
             {
                 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
-                        msg.message.getMessageId());
+                        msg.entry.debugIdentity());
             }
         }
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java Fri Feb  8 02:09:37 2008
@@ -24,6 +24,7 @@
 import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoreContext;
 
 /**
@@ -34,15 +35,13 @@
 public class EnqueueRecord implements TransactionRecord
 {
     private final StoreContext _storeContext;
-    private final AMQMessage _msg;
-    private final AMQQueue _queue;
+    private final QueueEntry _entry;
     private final boolean _first;
 
-    EnqueueRecord(StoreContext storeContext, AMQMessage msg, AMQQueue q, boolean firsr)
+    EnqueueRecord(StoreContext storeContext, QueueEntry entry, boolean firsr)
     {
         _storeContext = storeContext;
-        _msg = msg;
-        _queue = q;
+        _entry = entry;
         _first = firsr;
     }
 
@@ -52,7 +51,7 @@
     {
         try
         {
-            _queue.process(_storeContext, _msg, _first);
+            _entry.process(_storeContext, _first);
         }
         catch (AMQException e)
         {