You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/02/21 11:09:09 UTC

svn commit: r629731 [1/3] - in /incubator/qpid/branches/M2.1/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/q...

Author: rgodfrey
Date: Thu Feb 21 02:09:03 2008
New Revision: 629731

URL: http://svn.apache.org/viewvc?rev=629731&view=rev
Log:
QPID-790 : Performance Improvements

Added:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/common/
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/common/support/
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
Modified:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Feb 21 02:09:03 2008
@@ -75,7 +75,7 @@
      * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
      * value of this represents the <b>last</b> tag sent out
      */
-    private AtomicLong _deliveryTag = new AtomicLong(0);
+    private long _deliveryTag = 0;
 
     /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
     private AMQQueue _defaultQueue;
@@ -99,8 +99,6 @@
 
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
 
-    private final MessageRouter _exchanges;
-
     private TransactionalContext _txnContext, _nonTransactedContext;
 
     /**
@@ -124,7 +122,7 @@
     public boolean ENABLE_JMSXUserID;
 
 
-    public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
+    public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
             throws AMQException
     {
         //Set values from configuration
@@ -136,7 +134,7 @@
         _prefetch_HighWaterMark = DEFAULT_PREFETCH;
         _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
         _messageStore = messageStore;
-        _exchanges = exchanges;
+
         // by default the session is non-transactional
         _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
     }
@@ -297,7 +295,7 @@
 
     public long getNextDeliveryTag()
     {
-        return _deliveryTag.incrementAndGet();
+        return ++_deliveryTag;
     }
 
     public int getNextConsumerTag()
@@ -969,16 +967,19 @@
 
     public void processReturns(AMQProtocolSession session) throws AMQException
     {
-        for (RequiredDeliveryException bouncedMessage : _returnMessages)
+        if(!_returnMessages.isEmpty())
         {
-            AMQMessage message = bouncedMessage.getAMQMessage();
-            session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
-                                                             new AMQShortString(bouncedMessage.getMessage()));
+            for (RequiredDeliveryException bouncedMessage : _returnMessages)
+            {
+                AMQMessage message = bouncedMessage.getAMQMessage();
+                session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+                                                                 new AMQShortString(bouncedMessage.getMessage()));
 
-            message.decrementReference(_storeContext);
-        }
+                message.decrementReference(_storeContext);
+            }
 
-        _returnMessages.clear();
+            _returnMessages.clear();
+        }
     }
 
     public boolean wouldSuspend(AMQMessage msg)

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Feb 21 02:09:03 2008
@@ -35,6 +35,7 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.FixedSizeByteBufferAllocator;
 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.qpid.AMQException;
@@ -275,7 +276,7 @@
         // once more testing of the performance of the simple allocator has been done
         if (!connectorConfig.enablePooledAllocator)
         {
-            ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+            ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
         }
 
         int port = connectorConfig.port;

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Thu Feb 21 02:09:03 2008
@@ -212,10 +212,9 @@
                 _logger.debug("Publishing message to queue " + queues);
             }
 
-            for (AMQQueue q : queues)
-            {
-                payload.enqueue(q);
-            }
+            payload.enqueue(queues);
+
+
         }
     }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Thu Feb 21 02:09:03 2008
@@ -26,6 +26,7 @@
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortStringTokenizer;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
@@ -40,11 +41,7 @@
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -84,12 +81,21 @@
 
     private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
 
-    private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
+    private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
+            new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+    private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
+            new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+    private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
             new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
     // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
-    private static final String TOPIC_SEPARATOR = ".";
-    private static final String AMQP_STAR = "*";
-    private static final String AMQP_HASH = "#";
+    private static final byte TOPIC_SEPARATOR = (byte)'.';
+    private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
+    private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
+    private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
+    private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized =
+            new ConcurrentHashMap<AMQShortString, AMQShortString[]>();
+    private static final byte HASH_BYTE = (byte)'#';
+    private static final byte STAR_BYTE = (byte)'*';
 
     /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */
     @MBeanDescription("Management Bean for Topic Exchange")
@@ -107,7 +113,7 @@
         public TabularData bindings() throws OpenDataException
         {
             _bindingList = new TabularDataSupport(_bindinglistDataType);
-            for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _routingKey2queues.entrySet())
+            for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet())
             {
                 AMQShortString key = entry.getKey();
                 List<String> queueList = new ArrayList<String>();
@@ -156,27 +162,75 @@
         assert queue != null;
         assert rKey != null;
 
-        AMQShortString routingKey = normalize(rKey);
+        _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey);
 
-        _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
         // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
-        List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+        List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
+
+
+
+
+
+
+
         // if we got null back, no previous value was associated with the specified routing key hence
         // we need to read back the new value just put into the map
         if (queueList == null)
         {
-            queueList = _routingKey2queues.get(routingKey);
+            queueList = _bindingKey2queues.get(rKey);
         }
 
+
+
         if (!queueList.contains(queue))
         {
             queueList.add(queue);
+
+
+            if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+            {
+                AMQShortString routingKey = normalize(rKey);
+                List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+
+                if(queueList2 == null)
+                {
+                    queueList2 = _wildCardBindingKey2queues.get(routingKey);
+                    AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR);
+
+                    ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens());
+
+                    while (keyTok.hasMoreTokens())
+                    {
+                        keyTokList.add(keyTok.nextToken());
+                    }
+
+                    _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()]));
+                }
+                queueList2.add(queue);
+
+            }
+            else
+            {
+                List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
+                if(queueList2 == null)
+                {
+                    queueList2 = _simpleBindingKey2queues.get(rKey);
+                }
+                queueList2.add(queue);
+
+            }
+
+
+
+
         }
         else if (_logger.isDebugEnabled())
         {
-            _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+            _logger.debug("Queue " + queue + " is already registered with routing key " + rKey);
         }
 
+
+
     }
 
     private AMQShortString normalize(AMQShortString routingKey)
@@ -186,53 +240,58 @@
             routingKey = AMQShortString.EMPTY_STRING;
         }
         
-        StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
-        List<String> _subscription = new ArrayList<String>();
+        AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
+
+        List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
 
         while (routingTokens.hasMoreTokens())
         {
-            _subscription.add(routingTokens.nextToken());
+            subscriptionList.add(routingTokens.nextToken());
         }
 
-        int size = _subscription.size();
+        int size = subscriptionList.size();
 
         for (int index = 0; index < size; index++)
         {
             // if there are more levels
             if ((index + 1) < size)
             {
-                if (_subscription.get(index).equals(AMQP_HASH))
+                if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN))
                 {
-                    if (_subscription.get(index + 1).equals(AMQP_HASH))
+                    if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN))
                     {
                         // we don't need #.# delete this one
-                        _subscription.remove(index);
+                        subscriptionList.remove(index);
                         size--;
                         // redo this normalisation
                         index--;
                     }
 
-                    if (_subscription.get(index + 1).equals(AMQP_STAR))
+                    if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN))
                     {
                         // we don't want #.* swap to *.#
                         // remove it and put it in at index + 1
-                        _subscription.add(index + 1, _subscription.remove(index));
+                        subscriptionList.add(index + 1, subscriptionList.remove(index));
                     }
                 }
             } // if we have more levels
         }
 
-        StringBuilder sb = new StringBuilder();
 
-        for (String s : _subscription)
+
+        AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
+/*
+        StringBuilder sb = new StringBuilder();
+        for (AMQShortString s : subscriptionList)
         {
             sb.append(s);
             sb.append(TOPIC_SEPARATOR);
         }
 
         sb.deleteCharAt(sb.length() - 1);
+*/
 
-        return new AMQShortString(sb.toString());
+        return normalizedString;
     }
 
     public void route(AMQMessage payload) throws AMQException
@@ -254,19 +313,14 @@
             else
             {
                 _logger.warn("No queues found for routing key " + routingKey);
-                _logger.warn("Routing map contains: " + _routingKey2queues);
+                _logger.warn("Routing map contains: " + _bindingKey2queues);
 
                 return;
             }
         }
 
-        for (AMQQueue q : queues)
-        {
-            // TODO: modify code generator to add clone() method then clone the deliver body
-            // without this addition we have a race condition - we will be modifying the body
-            // before the encoder has encoded the body for delivery
-            payload.enqueue(q);
-        }
+        payload.enqueue(queues);
+
     }
 
     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
@@ -276,21 +330,21 @@
 
     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
-        List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
+        List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
 
         return (queues != null) && queues.contains(queue);
     }
 
     public boolean isBound(AMQShortString routingKey)
     {
-        List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
+        List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
 
         return (queues != null) && !queues.isEmpty();
     }
 
     public boolean isBound(AMQQueue queue)
     {
-        for (List<AMQQueue> queues : _routingKey2queues.values())
+        for (List<AMQQueue> queues : _bindingKey2queues.values())
         {
             if (queues.contains(queue))
             {
@@ -303,7 +357,7 @@
 
     public boolean hasBindings()
     {
-        return !_routingKey2queues.isEmpty();
+        return !_bindingKey2queues.isEmpty();
     }
 
     public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
@@ -311,13 +365,11 @@
         assert queue != null;
         assert rKey != null;
 
-        AMQShortString routingKey = normalize(rKey);
-
-        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        List<AMQQueue> queues = _bindingKey2queues.get(rKey);
         if (queues == null)
         {
             throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
-                                   + " with routing key " + routingKey + ". No queue was registered with that _routing key");
+                                   + " with routing key " + rKey + ". No queue was registered with that _routing key");
 
         }
 
@@ -325,12 +377,39 @@
         if (!removedQ)
         {
             throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
-                                   + " with routing key " + routingKey);
+                                   + " with routing key " + rKey);
+        }
+
+
+        if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+        {
+            AMQShortString bindingKey = normalize(rKey);
+            List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey);
+            queues2.remove(queue);
+            if(queues2.isEmpty())
+            {
+                _wildCardBindingKey2queues.remove(bindingKey);
+                _bindingKey2Tokenized.remove(bindingKey);
+            }
+
         }
+        else
+        {
+            List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey);
+            queues2.remove(queue);
+            if(queues2.isEmpty())
+            {
+                _simpleBindingKey2queues.remove(rKey);
+            }
+
+        }
+
+
+
 
         if (queues.isEmpty())
         {
-            _routingKey2queues.remove(routingKey);
+            _bindingKey2queues.remove(rKey);
         }
     }
 
@@ -349,117 +428,167 @@
 
     public Map<AMQShortString, List<AMQQueue>> getBindings()
     {
-        return _routingKey2queues;
+        return _bindingKey2queues;
     }
 
     private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
     {
-        List<AMQQueue> list = new LinkedList<AMQQueue>();
-        StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
 
-        ArrayList<String> routingkeyList = new ArrayList<String>();
+        List<AMQQueue> list = null;
 
-        while (routingTokens.hasMoreTokens())
+        if(!_wildCardBindingKey2queues.isEmpty())
         {
-            String next = routingTokens.nextToken();
-            if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH))
-            {
-                continue;
-            }
 
-            routingkeyList.add(next);
-        }
 
-        for (AMQShortString queue : _routingKey2queues.keySet())
-        {
-            StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR);
+            AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
+
+            final int routingTokensCount = routingTokens.countTokens();
 
-            ArrayList<String> queueList = new ArrayList<String>();
 
-            while (queTok.hasMoreTokens())
+            AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount];
+
+            if(routingTokensCount == 1)
             {
-                queueList.add(queTok.nextToken());
+                routingkeyTokens[0] =routingKey;
             }
+            else
+            {
 
-            int depth = 0;
-            boolean matching = true;
-            boolean done = false;
-            int routingskip = 0;
-            int queueskip = 0;
 
-            while (matching && !done)
-            {
-                if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip)))
+                int token = 0;
+                while (routingTokens.hasMoreTokens())
                 {
-                    done = true;
 
-                    // if it was the routing key that ran out of digits
-                    if (routingkeyList.size() == (depth + routingskip))
-                    {
-                        if (queueList.size() > (depth + queueskip))
-                        { // a hash and it is the last entry
-                            matching =
-                                    queueList.get(depth + queueskip).equals(AMQP_HASH)
-                                    && (queueList.size() == (depth + queueskip + 1));
-                        }
-                    }
-                    else if (routingkeyList.size() > (depth + routingskip))
+                    AMQShortString next = routingTokens.nextToken();
+        /*            if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH))
                     {
-                        // There is still more routing key to check
-                        matching = false;
+                        continue;
                     }
+        */
 
-                    continue;
+                    routingkeyTokens[token++] = next;
                 }
+            }
+            for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet())
+            {
+
+                AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey);
+
+
+                boolean matching = true;
+                boolean done = false;
 
-                // if the values on the two topics don't match
-                if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip)))
+                int depthPlusRoutingSkip = 0;
+                int depthPlusQueueSkip = 0;
+
+                final int bindingKeyTokensCount = bindingKeyTokens.length;
+
+                while (matching && !done)
                 {
-                    if (queueList.get(depth + queueskip).equals(AMQP_STAR))
+
+                    if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip))
                     {
-                        depth++;
+                        done = true;
+
+                        // if it was the routing key that ran out of digits
+                        if (routingTokensCount == depthPlusRoutingSkip)
+                        {
+                            if (bindingKeyTokensCount > depthPlusQueueSkip)
+                            { // a hash and it is the last entry
+                                matching =
+                                        bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)
+                                        && (bindingKeyTokensCount == (depthPlusQueueSkip + 1));
+                            }
+                        }
+                        else if (routingTokensCount > depthPlusRoutingSkip)
+                        {
+                            // There is still more routing key to check
+                            matching = false;
+                        }
 
                         continue;
                     }
-                    else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
+
+                    // if the values on the two topics don't match
+                    if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip]))
                     {
-                        // Is this a # at the end
-                        if (queueList.size() == (depth + queueskip + 1))
+                        if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN))
                         {
-                            done = true;
+                            depthPlusQueueSkip++;
+                            depthPlusRoutingSkip++;
 
                             continue;
                         }
-
-                        // otherwise # in the middle
-                        while (routingkeyList.size() > (depth + routingskip))
+                        else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN))
                         {
-                            if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
+                            // Is this a # at the end
+                            if (bindingKeyTokensCount == (depthPlusQueueSkip + 1))
+                            {
+                                done = true;
+
+                                continue;
+                            }
+
+                            // otherwise # in the middle
+                            while (routingTokensCount > depthPlusRoutingSkip)
                             {
-                                queueskip++;
-                                depth++;
+                                if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1]))
+                                {
+                                    depthPlusQueueSkip += 2;
+                                    depthPlusRoutingSkip++;
+
+                                    break;
+                                }
 
-                                break;
+                                depthPlusRoutingSkip++;
                             }
 
-                            routingskip++;
+                            continue;
                         }
 
-                        continue;
+                        matching = false;
                     }
 
-                    matching = false;
+                    depthPlusQueueSkip++;
+                    depthPlusRoutingSkip++;
                 }
 
-                depth++;
+                if (matching)
+                {
+                    if(list == null)
+                    {
+                        list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey));
+                    }
+                    else
+                    {
+                        list.addAll(_wildCardBindingKey2queues.get(bindingKey));
+                    }
+                }
             }
 
-            if (matching)
+        }
+        if(!_simpleBindingKey2queues.isEmpty())
+        {
+            List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey);
+            if(list == null)
+            {
+                if(queues == null)
+                {
+                    list =  Collections.EMPTY_LIST;
+                }
+                else
+                {
+                    list = new ArrayList<AMQQueue>(queues);
+                }
+            }
+            else if(queues != null)
             {
-                list.addAll(_routingKey2queues.get(queue));
+                list.addAll(queues);
             }
+
         }
 
         return list;
+
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Thu Feb 21 02:09:03 2008
@@ -42,6 +42,7 @@
 import javax.management.openmbean.TabularDataSupport;
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 public class FanoutExchange extends AbstractExchange
@@ -205,10 +206,8 @@
                 _logger.debug("Publishing message to queue " + _queues);
             }
 
-            for (AMQQueue q : _queues)
-            {
-                payload.enqueue(q);
-            }
+            payload.enqueue(new ArrayList(_queues));
+
         }
     }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Thu Feb 21 02:09:03 2008
@@ -29,7 +29,6 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
 import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -55,8 +54,8 @@
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
 
-        final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore(),
-                                                  virtualHost.getExchangeRegistry());
+        final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
+        );
         session.addChannel(channel);
 
         ChannelOpenOkBody response;

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Thu Feb 21 02:09:03 2008
@@ -100,8 +100,8 @@
             ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
 
             AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
             writeFrame(compositeBlock);
 
             //
@@ -151,8 +151,8 @@
             ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
 
             AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
             writeFrame(compositeBlock);
 
             //
@@ -247,14 +247,13 @@
         if (bodyFrameIterator.hasNext())
         {
             AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
             writeFrame(compositeBlock);
         }
         else
         {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
-                                                                             new AMQDataBlock[]{contentHeader});
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
 
             writeFrame(compositeBlock);
         }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Thu Feb 21 02:09:03 2008
@@ -12,11 +12,14 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
     private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+    private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
 
 
     public static Factory getInstanceFactory()
@@ -47,10 +50,9 @@
     public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+        AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
         final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      contentHeaderBody);
+
 
         final AMQMessageHandle messageHandle = message.getMessageHandle();
         final StoreContext storeContext = message.getStoreContext();
@@ -60,8 +62,8 @@
 
         if(bodyCount == 0)
         {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
+            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+                                                                             contentHeaderBody);
 
             writeFrame(compositeBlock);
         }
@@ -75,9 +77,9 @@
             //
             ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+            AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
+
+            CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
             writeFrame(compositeBlock);
 
             //
@@ -86,7 +88,7 @@
             for(int i = 1; i < bodyCount; i++)
             {
                 cb = messageHandle.getContentChunk(storeContext,messageId, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
             }
 
 
@@ -95,6 +97,14 @@
 
     }
 
+    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+    {
+        
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      contentHeaderBody);
+        return contentHeader;
+    }
+
 
     public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
     {
@@ -106,8 +116,7 @@
         AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
 
 
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
+        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
 
         final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
         if(bodyCount == 0)
@@ -126,9 +135,9 @@
             //
             ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
+            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
             writeFrame(compositeBlock);
 
             //
@@ -137,7 +146,7 @@
             for(int i = 1; i < bodyCount; i++)
             {
                 cb = messageHandle.getContentChunk(storeContext, messageId, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
             }
 
 
@@ -147,7 +156,7 @@
     }
 
 
-    private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
+    private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
             throws AMQException
     {
         final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -158,23 +167,53 @@
         final AMQShortString exchangeName = pb.getExchange();
         final AMQShortString routingKey = pb.getRoutingKey();
 
-        final AMQDataBlock returnBlock = new DeferredDataBlock()
+        final AMQBody returnBlock = new AMQBody()
         {
 
-            protected AMQDataBlock createAMQDataBlock()
+            public AMQBody _underlyingBody;
+
+            public AMQBody createAMQBody()
             {
-                BasicDeliverBody deliverBody =
-                        METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
                                                               deliveryTag,
                                                               isRedelivered,
                                                               exchangeName,
                                                               routingKey);
-                AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
 
 
-                return deliverFrame;
+
+
 
             }
+
+            public byte getFrameType()
+            {
+                return AMQMethodBody.TYPE;
+            }
+
+            public int getSize()
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                return _underlyingBody.getSize();
+            }
+
+            public void writePayload(ByteBuffer buffer)
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                _underlyingBody.writePayload(buffer);
+            }
+
+            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+                throws AMQException
+            {
+                throw new AMQException("This block should never be dispatched!");
+            }
         };
         return returnBlock;
     }
@@ -225,8 +264,7 @@
     {
         AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
 
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
+        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
 
         Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
         //
@@ -236,14 +274,13 @@
         if (bodyFrameIterator.hasNext())
         {
             AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
             writeFrame(compositeBlock);
         }
         else
         {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
-                                                                             new AMQDataBlock[]{contentHeader});
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
 
             writeFrame(compositeBlock);
         }
@@ -272,4 +309,64 @@
         writeFrame(basicCancelOkBody.generateFrame(channelId));
 
     }
+
+
+    public static final class CompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final AMQBody _contentBody;
+        private final int _channel;
+
+
+        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+            _contentBody = contentBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+        }
+
+        public void writePayload(ByteBuffer buffer)
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+        }
+    }
+
+    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final int _channel;
+
+
+        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+        }
+
+        public void writePayload(ByteBuffer buffer)
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+        }
+    }
+
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Feb 21 02:09:03 2008
@@ -208,27 +208,10 @@
         {
             _logger.debug("Frame Received: " + frame);
         }
+                
+
+        body.handle(channelId, this);
 
-        if (body instanceof AMQMethodBody)
-        {
-            methodFrameReceived(channelId, (AMQMethodBody) body);
-        }
-        else if (body instanceof ContentHeaderBody)
-        {
-            contentHeaderReceived(channelId, (ContentHeaderBody) body);
-        }
-        else if (body instanceof ContentBody)
-        {
-            contentBodyReceived(channelId, (ContentBody) body);
-        }
-        else if (body instanceof HeartbeatBody)
-        {
-            // NO OP
-        }
-        else
-        {
-            _logger.warn("Unrecognised frame " + frame.getClass().getName());
-        }
     }
 
     private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -271,7 +254,7 @@
         }
     }
 
-    private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+    public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
     {
 
         final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
@@ -365,7 +348,7 @@
         }
         catch (Exception e)
         {
-            _stateManager.error(e);
+            
             for (AMQMethodListener listener : _frameListeners)
             {
                 listener.error(e);
@@ -375,7 +358,7 @@
         }
     }
 
-    private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
+    public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
     {
 
         AMQChannel channel = getAndAssertChannel(channelId);
@@ -384,11 +367,16 @@
 
     }
 
-    private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
+    public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
     {
         AMQChannel channel = getAndAssertChannel(channelId);
 
         channel.publishContentBody(body, this);
+    }
+
+    public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+    {
+        // NO - OP
     }
 
     /**

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Thu Feb 21 02:09:03 2008
@@ -30,6 +30,7 @@
 import org.apache.mina.filter.SSLFilter;
 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
 import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.mina.util.SessionUtil;
@@ -82,7 +83,7 @@
         createSession(protocolSession, _applicationRegistry, codecFactory);
         _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
 
-        final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
+        final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
 
         ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
                 getConfiguredObject(ConnectorConfiguration.class);

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Feb 21 02:09:03 2008
@@ -83,7 +83,7 @@
     private long _expiration;
 
 
-    private final int hashcode = System.identityHashCode(this);
+
 
     private Exchange _exchange;
     private static final boolean SYNCED_CLOCKS =
@@ -92,7 +92,7 @@
 
     public String debugIdentity()
     {
-        return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
+        return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
     }
 
     public void setExpiration()
@@ -141,6 +141,11 @@
         _exchange.route(this);
     }
 
+    public void enqueue(final List<AMQQueue> queues)
+    {
+        _transientMessageData.setDestinationQueues(queues);
+    }
+
     /**
      * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
      * therefore is memory-efficient.
@@ -664,7 +669,7 @@
         }
         finally
         {
-            destinationQueues.clear();
+
             // Remove refence for routing process . Reference count should now == delivered queue count
             decrementReference(storeContext);
         }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Feb 21 02:09:03 2008
@@ -37,9 +37,7 @@
 
 import javax.management.JMException;
 import java.text.MessageFormat;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -158,6 +156,8 @@
     public AtomicLong _totalMessagesReceived = new AtomicLong();
 
 
+    private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
 
     public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
@@ -200,6 +200,13 @@
         _subscribers = subscribers;
         _subscriptionFactory = subscriptionFactory;
         _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+
+        // This ensure that the notification checks for the configured alerts are created.
+        setMaximumMessageAge(_maximumMessageAge);
+        setMaximumMessageCount(_maximumMessageCount);
+        setMaximumMessageSize(_maximumMessageSize);
+        setMaximumQueueDepth(_maximumQueueDepth);
+
     }
 
     private AMQQueueMBean createMBean() throws AMQException
@@ -214,7 +221,7 @@
         }
     }
 
-    public AMQShortString getName()
+    public final AMQShortString getName()
     {
         return _name;
     }
@@ -540,9 +547,17 @@
         return _maximumMessageSize;
     }
 
-    public void setMaximumMessageSize(long value)
+    public void setMaximumMessageSize(final long maximumMessageSize)
     {
-        _maximumMessageSize = value;
+        _maximumMessageSize = maximumMessageSize;
+        if(maximumMessageSize == 0L)
+        {
+            _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
+        }
+        else
+        {
+            _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
+        }
     }
 
     public int getConsumerCount()
@@ -565,9 +580,20 @@
         return _maximumMessageCount;
     }
 
-    public void setMaximumMessageCount(long value)
+    public void setMaximumMessageCount(final long maximumMessageCount)
     {
-        _maximumMessageCount = value;
+        _maximumMessageCount = maximumMessageCount;
+        if(maximumMessageCount == 0L)
+        {
+            _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+        }
+        else
+        {
+            _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+        }
+
+
+
     }
 
     public long getMaximumQueueDepth()
@@ -576,9 +602,18 @@
     }
 
     // Sets the queue depth, the max queue size
-    public void setMaximumQueueDepth(long value)
+    public void setMaximumQueueDepth(final long maximumQueueDepth)
     {
-        _maximumQueueDepth = value;
+        _maximumQueueDepth = maximumQueueDepth;
+        if(maximumQueueDepth == 0L)
+        {
+            _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
+        }
+        else
+        {
+            _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
+        }
+
     }
 
     public long getOldestMessageArrivalTime()
@@ -661,6 +696,10 @@
         }
 
         _subscribers.addSubscriber(subscription);
+        if(exclusive)
+        {
+            _subscribers.setExclusive(true);
+        }
     }
 
     private boolean isExclusive()
@@ -692,6 +731,7 @@
                     ps, channel, consumerTag, this));
         }
 
+        _subscribers.setExclusive(false);
         Subscription removedSubscription;
         if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
                                                                                                          consumerTag)))
@@ -805,7 +845,7 @@
     public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
     {
         AMQMessage msg = entry.getMessage();
-        _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
+        _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
         try
         {
             msg.checkDeliveredToConsumer();
@@ -938,6 +978,14 @@
     public void setMaximumMessageAge(long maximumMessageAge)
     {
         _maximumMessageAge = maximumMessageAge;
+        if(maximumMessageAge == 0L)
+        {
+            _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
+        }
+        else
+        {
+            _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
+        }
     }
 
     public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
@@ -965,5 +1013,10 @@
                 _deliveryMgr.removeExpired();
             }
         }
+    }
+
+    public final Set<NotificationCheck> getNotificationChecks()
+    {
+        return _notificationChecks;
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Feb 21 02:09:03 2008
@@ -54,10 +54,7 @@
 import javax.management.openmbean.TabularType;
 
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
 /**
  * AMQQueueMBean is the management bean for an {@link AMQQueue}.
@@ -97,6 +94,9 @@
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
     private Notification _lastNotification = null;
 
+
+
+
     @MBeanConstructor("Creates an MBean exposing an AMQQueue")
     public AMQQueueMBean(AMQQueue queue) throws JMException
     {
@@ -249,16 +249,21 @@
     public void checkForNotification(AMQMessage msg) throws AMQException, JMException
     {
 
-        final long currentTime = System.currentTimeMillis();
-        final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+        final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
 
-        for (NotificationCheck check : NotificationCheck.values())
+        if(!notificationChecks.isEmpty())
         {
-            if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+            final long currentTime = System.currentTimeMillis();
+            final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+
+            for (NotificationCheck check : notificationChecks)
             {
-                if (check.notifyIfNecessary(msg, _queue, this))
+                if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
                 {
-                    _lastNotificationTimes[check.ordinal()] = currentTime;
+                    if (check.notifyIfNecessary(msg, _queue, this))
+                    {
+                        _lastNotificationTimes[check.ordinal()] = currentTime;
+                    }
                 }
             }
         }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Feb 21 02:09:03 2008
@@ -363,8 +363,9 @@
 
                     protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
                                                                             deliveryTag, _queue.getMessageCount());
-                    _totalMessageSize.addAndGet(-entry.getSize());
+
                 }
+                _totalMessageSize.addAndGet(-entry.getSize());
 
                 if (!acks)
                 {
@@ -918,7 +919,7 @@
                 {
                     if (!s.isSuspended())
                     {
-                        if (_log.isDebugEnabled())
+                        if (debugEnabled)
                         {
                             _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Thu Feb 21 02:09:03 2008
@@ -29,9 +29,9 @@
     {
         boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
-            int msgCount = queue.getMessageCount();
+            int msgCount;
             final long maximumMessageCount = queue.getMaximumMessageCount();
-            if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+            if (maximumMessageCount!= 0 && (msgCount =  queue.getMessageCount()) >= maximumMessageCount)
             {
                 listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
                 return true;

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Feb 21 02:09:03 2008
@@ -292,14 +292,17 @@
                 queue.dequeue(storeContext, entry);
             }
 
+/*
+            if (_sendLock.get())
+            {
+                _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
+            }
+*/
+
             synchronized (channel)
             {
                 long deliveryTag = channel.getNextDeliveryTag();
 
-                if (_sendLock.get())
-                {
-                    _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
-                }
 
                 if (_acks)
                 {
@@ -308,10 +311,11 @@
 
                 protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
 
-                if (!_acks)
-                {
-                    entry.getMessage().decrementReference(storeContext);
-                }
+
+            }
+            if (!_acks)
+            {
+                entry.getMessage().decrementReference(storeContext);
             }
         }
         finally
@@ -367,59 +371,60 @@
 //            return false;
         }
 
-        final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+
 
         //todo - client id should be recoreded and this test removed but handled below
-        if (_noLocal && publisher != null)
+        if (_noLocal)
         {
-            // We don't want local messages so check to see if message is one we sent
-            Object localInstance;
-            Object msgInstance;
 
-            if ((protocolSession.getClientProperties() != null) &&
-                (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+            final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+            if(publisher != null)
+
             {
+                // We don't want local messages so check to see if message is one we sent
+                Object localInstance;
+                Object msgInstance;
 
-                if ((publisher.getClientProperties() != null) &&
-                    (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+                if ((protocolSession.getClientProperties() != null) &&
+                    (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
                 {
-                    if (localInstance == msgInstance || localInstance.equals(msgInstance))
+
+                    if ((publisher.getClientProperties() != null) &&
+                        (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
                     {
-//                        if (_logger.isTraceEnabled())
-//                        {
-//                            _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-//                                          msg.debugIdentity() + ")");
-//                        }
-                        return false;
+                        if (localInstance == msgInstance || localInstance.equals(msgInstance))
+                        {
+    //                        if (_logger.isTraceEnabled())
+    //                        {
+    //                            _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+    //                                          msg.debugIdentity() + ")");
+    //                        }
+                            return false;
+                        }
                     }
                 }
-            }
-            else
-            {
+                else
+                {
 
-                localInstance = protocolSession.getClientIdentifier();
-                //todo - client id should be recoreded and this test removed but handled here
+                    localInstance = protocolSession.getClientIdentifier();
+                    //todo - client id should be recoreded and this test removed but handled here
 
-                msgInstance = publisher.getClientIdentifier();
-                if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
-                {
-//                    if (_logger.isTraceEnabled())
-//                    {
-//                        _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-//                                      msg.debugIdentity() + ")");
-//                    }
-                    return false;
+                    msgInstance = publisher.getClientIdentifier();
+                    if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+                    {
+    //                    if (_logger.isTraceEnabled())
+    //                    {
+    //                        _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+    //                                      msg.debugIdentity() + ")");
+    //                    }
+                        return false;
+                    }
                 }
-            }
-
 
+            }
         }
 
 
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
-        }
         return checkFilters(entry);
 
     }
@@ -433,23 +438,7 @@
 
     private boolean checkFilters(QueueEntry msg)
     {
-        if (_filters != null)
-        {
-//            if (_logger.isTraceEnabled())
-//            {
-//                _logger.trace("(" + debugIdentity() + ") has filters.");
-//            }
-            return _filters.allAllow(msg.getMessage());
-        }
-        else
-        {
-//            if (_logger.isTraceEnabled())
-//            {
-//                _logger.trace("(" + debugIdentity() + ") has no filters");
-//            }
-
-            return true;
-        }
+        return (_filters == null) || _filters.allAllow(msg.getMessage());
     }
 
     public Queue<QueueEntry> getPreDeliveryQueue()
@@ -613,7 +602,7 @@
 
     public boolean wouldSuspend(QueueEntry msg)
     {
-        return channel.wouldSuspend(msg.getMessage());
+        return _acks && channel.wouldSuspend(msg.getMessage());
     }
 
     public Queue<QueueEntry> getResendQueue()

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Thu Feb 21 02:09:03 2008
@@ -39,6 +39,7 @@
     private int _currentSubscriber;
 
     private final Object _changeLock = new Object();
+    private volatile boolean _exclusive;
 
 
     /** Accessor for unit tests. */
@@ -116,10 +117,7 @@
      */
     public Subscription nextSubscriber(QueueEntry msg)
     {
-        if (_subscriptions.isEmpty())
-        {
-            return null;
-        }
+
 
         try
         {
@@ -143,30 +141,64 @@
 
     private Subscription nextSubscriberImpl(QueueEntry msg)
     {
-        final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
-        while (iterator.hasNext())
+        if(_exclusive)
         {
-            Subscription subscription = iterator.next();
-            ++_currentSubscriber;
-            subscriberScanned();
-
-            if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+            try
             {
-                if (subscription.hasInterest(msg))
+                Subscription subscription = _subscriptions.get(0);
+                subscriberScanned();
+
+                if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
                 {
-                    // if the queue is not empty then this client is ready to receive a message.
-                    //FIXME the queue could be full of sent messages.
-                    // Either need to clean all PDQs after sending a message
-                    // OR have a clean up thread that runs the PDQs expunging the messages.
-                    if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+                    if (subscription.hasInterest(msg))
                     {
-                        return subscription;
+                        // if the queue is not empty then this client is ready to receive a message.
+                        //FIXME the queue could be full of sent messages.
+                        // Either need to clean all PDQs after sending a message
+                        // OR have a clean up thread that runs the PDQs expunging the messages.
+                        if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+                        {
+                            return subscription;
+                        }
                     }
                 }
             }
+            catch(IndexOutOfBoundsException e)
+            {
+            }
+            return null;
         }
+        else
+        {
+            if (_subscriptions.isEmpty())
+            {
+                return null;
+            }
+            final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+            while (iterator.hasNext())
+            {
+                Subscription subscription = iterator.next();
+                ++_currentSubscriber;
+                subscriberScanned();
 
-        return null;
+                if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+                {
+                    if (subscription.hasInterest(msg))
+                    {
+                        // if the queue is not empty then this client is ready to receive a message.
+                        //FIXME the queue could be full of sent messages.
+                        // Either need to clean all PDQs after sending a message
+                        // OR have a clean up thread that runs the PDQs expunging the messages.
+                        if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+                        {
+                            return subscription;
+                        }
+                    }
+                }
+            }
+
+            return null;
+        }
     }
 
     /** Overridden in test classes. */
@@ -233,5 +265,14 @@
     {
         return _changeLock;
     }
-    
+
+    public void setExclusive(final boolean exclusive)
+    {
+        _exclusive = exclusive;
+    }
+
+    public boolean getExcBoolean()
+    {
+        return _exclusive;
+    }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Thu Feb 21 02:09:03 2008
@@ -22,6 +22,8 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -60,7 +62,7 @@
      * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
      * by the message handle.
      */
-    private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+    private List<AMQQueue> _destinationQueues;
 
     public MessagePublishInfo getMessagePublishInfo()
     {
@@ -74,7 +76,7 @@
 
     public List<AMQQueue> getDestinationQueues()
     {
-        return _destinationQueues;
+        return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues;
     }
 
     public void setDestinationQueues(List<AMQQueue> destinationQueues)
@@ -109,6 +111,10 @@
 
     public void addDestinationQueue(AMQQueue queue)
     {
+        if(_destinationQueues == null)
+        {
+            _destinationQueues = new ArrayList<AMQQueue>();
+        }
         _destinationQueues.add(queue);
     }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Thu Feb 21 02:09:03 2008
@@ -49,11 +49,11 @@
     /** Where to put undeliverable messages */
     private final List<RequiredDeliveryException> _returnMessages;
 
-    private Set<Long> _browsedAcks;
+    private final Set<Long> _browsedAcks;
 
     private final MessageStore _messageStore;
 
-    private StoreContext _storeContext;
+    private final StoreContext _storeContext;
 
     /** Whether we are in a transaction */
     private boolean _inTran;

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Feb 21 02:09:03 2008
@@ -23,6 +23,7 @@
 import javax.management.NotCompliantMBeanException;
 
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.AMQBrokerManagerMBean;
 import org.apache.qpid.server.security.access.AccessManager;
@@ -123,7 +124,7 @@
      */
     public VirtualHost(String name, MessageStore store) throws Exception
     {
-        this(name, null, store);
+        this(name, new PropertiesConfiguration(), store);
     }
 
     /**

Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Thu Feb 21 02:09:03 2008
@@ -173,7 +173,7 @@
     public void testQueueDepthAlertWithSubscribers() throws Exception
     {
         protocolSession = new TestMinaProtocolSession();
-        AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null);
+        AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
         protocolSession.addChannel(channel);
 
         // Create queue

Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Thu Feb 21 02:09:03 2008
@@ -123,7 +123,7 @@
 
 
         TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
-        AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
+        AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
         protocolSession.addChannel(channel);
 
         _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);