You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/19 11:35:25 UTC

svn commit: r497770 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/ha...

Author: rgreig
Date: Fri Jan 19 02:35:21 2007
New Revision: 497770

URL: http://svn.apache.org/viewvc?view=rev&rev=497770
Log:
QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos size / default exchanges and some other small fixes highlighted by the python tests

Removed:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Jan 19 02:35:21 2007
@@ -37,6 +37,7 @@
 import org.apache.qpid.server.txn.LocalTransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.mina.common.ByteBuffer;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,6 +57,8 @@
 
     private long _prefetch_LowWaterMark;
 
+    private long _prefetchSize;
+
     /**
      * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
      * value of this represents the <b>last</b> tag sent out
@@ -108,6 +111,8 @@
 
     private Set<Long> _browsedAcks = new HashSet<Long>();
 
+
+
     public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
             throws AMQException
     {
@@ -151,6 +156,17 @@
         _prefetch_HighWaterMark = prefetchCount;
     }
 
+    public long getPrefetchSize()
+    {
+        return _prefetchSize;
+    }
+
+
+    public void setPrefetchSize(long prefetchSize)
+    {
+        _prefetchSize = prefetchSize;
+    }
+
     public long getPrefetchLowMarkCount()
     {
         return _prefetch_LowWaterMark;
@@ -213,14 +229,15 @@
             throw new AMQException("Received content body without previously receiving a JmsPublishBody");
         }
 
-        // returns true iff the message was delivered (i.e. if all data was
-        // received
         if (_log.isDebugEnabled())
         {
             _log.debug("Content body received on channel " + _channelId);
         }
         try
         {
+
+            // returns true iff the message was delivered (i.e. if all data was
+            // received
             if (_currentMessage.addContentBodyFrame(_storeContext, contentBody))
             {
                 // callback to allow the context to do any post message processing
@@ -269,13 +286,14 @@
      * @param queue   the queue to subscribe to
      * @param session the protocol session of the subscriber
      * @param noLocal
+     * @param exclusive
      * @return the consumer tag. This is returned to the subscriber and used in
      *         subsequent unsubscribe requests
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
     public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
-                                           FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
+                                           FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -286,7 +304,7 @@
             throw new ConsumerTagNotUniqueException();
         }
 
-        queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal);
+        queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
         _consumerTag2QueueMap.put(tag, queue);
         return tag;
     }
@@ -364,8 +382,10 @@
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      */
-    public void resend(final AMQProtocolSession session) throws AMQException
+    public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
     {
+        final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
         {
             public boolean callback(UnacknowledgedMessage message) throws AMQException
@@ -374,7 +394,20 @@
                 AMQShortString consumerTag = message.consumerTag;
                 AMQMessage msg = message.message;
                 msg.setRedelivered(true);
-                msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+                if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+                {
+                    msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+                }
+                else
+                {
+                    // Message has no consumer tag, so was "delivered" to a GET
+                    // or consumer no longer registered
+                    // cannot resend, so re-queue.
+                    if (message.queue != null && (consumerTag == null || requeue))
+                    {
+                        msgToRequeue.add(message);                         
+                    }
+                }
                 // false means continue processing
                 return false;
             }
@@ -383,6 +416,12 @@
             {
             }
         });
+
+        for(UnacknowledgedMessage message : msgToRequeue)
+        {
+            _txnContext.deliver(message.message, message.queue);
+            _unacknowledgedMessageMap.remove(message.deliveryTag);
+        }
     }
 
     /**
@@ -459,8 +498,9 @@
     {
         boolean suspend;
         
-        suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
-
+        suspend = ((_prefetch_HighWaterMark != 0) &&  _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
+                 || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+        
         setSuspended(suspend);
     }
 
@@ -544,5 +584,32 @@
             message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage()));
         }
         _returnMessages.clear();
+    }
+
+
+    public boolean wouldSuspend(AMQMessage msg)
+    {
+        if (isSuspended())
+        {
+            return true;
+        }
+        else
+        {
+            boolean willSuspend = ((_prefetch_HighWaterMark != 0) &&  _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+            if(!willSuspend)
+            {
+                final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
+
+                willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize);
+            }
+
+
+            if(willSuspend)
+            {
+                setSuspended(true);
+            }
+            return willSuspend;
+        }
+
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Fri Jan 19 02:35:21 2007
@@ -73,5 +73,7 @@
      * @return a set of delivery tags
      */
     Set<Long> getDeliveryTags();
+
+    public long getUnacknowledgeBytes();
 }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Fri Jan 19 02:35:21 2007
@@ -32,6 +32,8 @@
 {
     private final Object _lock = new Object();
 
+    private long _unackedSize;
+
     private Map<Long, UnacknowledgedMessage> _map;
 
     private long _lastDeliveryTag;
@@ -77,7 +79,8 @@
         {
             for (UnacknowledgedMessage msg : msgs)
             {
-                _map.remove(msg.deliveryTag);
+                remove(msg.deliveryTag);
+
             }
         }
     }
@@ -86,7 +89,14 @@
     {
         synchronized (_lock)
         {
-            return _map.remove(deliveryTag);
+
+            UnacknowledgedMessage message = _map.remove(deliveryTag);
+            if(message != null)
+            {
+                _unackedSize -= message.message.getSize();
+            }
+
+            return message;
         }
     }
 
@@ -113,6 +123,7 @@
         synchronized (_lock)
         {
             _map.put(deliveryTag, message);
+            _unackedSize += message.message.getSize();            
             _lastDeliveryTag = deliveryTag;
         }
     }
@@ -123,6 +134,7 @@
         {
             Collection<UnacknowledgedMessage> currentEntries = _map.values();
             _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit);
+            _unackedSize = 0l;
             return currentEntries;
         }
     }
@@ -149,6 +161,7 @@
         synchronized (_lock)
         {
             _map.clear();
+            _unackedSize = 0l;
         }
     }
 
@@ -169,6 +182,7 @@
                 }
 
                 it.remove();
+                _unackedSize -= unacked.getValue().message.getSize();
 
                 destination.add(unacked.getValue());
                 if (unacked.getKey() == deliveryTag)
@@ -189,7 +203,10 @@
                 AMQShortString consumerTag = entry.getValue().consumerTag;
                 AMQMessage msg = entry.getValue().message;
 
-                msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+                if(consumerTag != null)
+                {
+                    msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+                }
             }
         }
     }
@@ -223,5 +240,10 @@
                 }
             }
         }
+    }
+
+    public long getUnacknowledgeBytes()
+    {
+        return _unackedSize;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Fri Jan 19 02:35:21 2007
@@ -38,6 +38,8 @@
      */
     private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
 
+    private Exchange _defaultExchange;
+
     public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
     {
         //create 'standard' exchanges:
@@ -53,9 +55,18 @@
 
     public void registerExchange(Exchange exchange)
     {
+        if(_defaultExchange == null)
+        {
+            setDefaultExchange(exchange);
+        }
         _exchangeMap.put(exchange.getName(), exchange);
     }
 
+    public void setDefaultExchange(Exchange exchange)
+    {
+        _defaultExchange = exchange;
+    }
+
     public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
     {
         // TODO: check inUse argument
@@ -72,7 +83,16 @@
 
     public Exchange getExchange(AMQShortString name)
     {
-        return _exchangeMap.get(name);
+
+        if(name == null || name.length() == 0)
+        {
+            return _defaultExchange;
+        }
+        else
+        {
+            return _exchangeMap.get(name);
+        }
+
     }
 
     /**
@@ -83,7 +103,7 @@
     public void routeContent(AMQMessage payload) throws AMQException
     {
         final AMQShortString exchange = payload.getPublishBody().exchange;
-        final Exchange exch = _exchangeMap.get(exchange);
+        final Exchange exch = getExchange(exchange);
         // there is a small window of opportunity for the exchange to be deleted in between
         // the BasicPublish being received (where the exchange is validated) and the final
         // content body being received (which triggers this method)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Fri Jan 19 02:35:21 2007
@@ -38,4 +38,6 @@
     void unregisterExchange(AMQShortString name, boolean inUse) throws ExchangeInUseException, AMQException;
 
     Exchange getExchange(AMQShortString name);
+
+    void setDefaultExchange(Exchange exchange);
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Fri Jan 19 02:35:21 2007
@@ -22,6 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.server.AMQChannel;
@@ -66,6 +68,7 @@
         }
         else
         {
+
             AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
 
             if (queue == null)
@@ -73,29 +76,13 @@
                 _log.info("No queue for '" + body.queue + "'");
                 if(body.queue!=null)
                 {
-                    AMQShortString msg = new AMQShortString("No such queue, '" + body.queue + "'");
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
-                        (byte)8, (byte)0,	// AMQP version (major, minor)
-                        BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
-                        BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
-                        AMQConstant.NOT_FOUND.getCode(),	// replyCode
-                        msg));	// replyText
+                    String msg = "No such queue, '" + body.queue + "'";
+                    throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg);
                 }
                 else
                 {
-                    AMQShortString msg = new AMQShortString("No queue name provided, no default queue defined.");
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
-                        (byte)8, (byte)0,	// AMQP version (major, minor)
-                        BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
-                        BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
-                        AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
-                        msg));	// replyText
+                    String msg = "No queue name provided, no default queue defined.";
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),msg );
                 }
             }
             else
@@ -103,7 +90,7 @@
                 try
                 {
                     AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
-                                                                  body.arguments, body.noLocal);
+                                                                  body.arguments, body.noLocal, body.exclusive);
                     if (!body.nowait)
                     {
                         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -143,6 +130,21 @@
                         AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
                         msg));	// replyText
                 }
+                catch (AMQQueue.ExistingExclusiveSubscription e)
+                {
+                    throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+                                                  "Cannot subscribe to queue "
+                                                          + queue.getName()
+                                                          + " as it already has an existing exclusive consumer");
+                }
+                catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+                                {
+                                    throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+                                                                  "Cannot subscribe to queue "
+                                                                          + queue.getName()
+                                                                          + " exclusively as it already has a consumer");
+                                }
+
             }
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Fri Jan 19 02:35:21 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.BasicPublishBody;
@@ -42,7 +43,6 @@
 
     private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
 
-    private static final AMQShortString UNKNOWN_EXCHANGE_NAME = new AMQShortString("Unknown exchange name");
 
     public static BasicPublishMethodHandler getInstance()
     {
@@ -74,19 +74,8 @@
         // if the exchange does not exist we raise a channel exception
         if (e == null)
         {
-            protocolSession.closeChannel(evt.getChannelId());
-            // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
-            // then we can remove the hardcoded 0,0
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(),
-                (byte)8, (byte)0,	// AMQP version (major, minor)
-                ChannelCloseBody.getClazz((byte)8, (byte)0),	// classId
-                ChannelCloseBody.getMethod((byte)8, (byte)0),	// methodId
-                500,	// replyCode
-                UNKNOWN_EXCHANGE_NAME);	// replyText
-            protocolSession.writeFrame(cf);
+            throw body.getChannelException(500, "Unknown exchange name");
+
         }
         else
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Fri Jan 19 02:35:21 2007
@@ -44,6 +44,8 @@
                                AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
     {
         session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
+        session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize);
+
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java Fri Jan 19 02:35:21 2007
@@ -52,6 +52,8 @@
         {
             throw new AMQException("Unknown channel " + evt.getChannelId());
         }
-        channel.resend(protocolSession);
+        BasicRecoverBody body = evt.getMethod();
+        channel.resend(protocolSession, body.requeue);
+
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Fri Jan 19 02:35:21 2007
@@ -56,21 +56,22 @@
                                AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
     {
         ConnectionOpenBody body = evt.getMethod();
-        AMQShortString contextKey = body.virtualHost;
+        
+
 
         //todo //FIXME The virtual host must be validated by the server for the connection to open-ok
         // See Spec (0.8.2). Section  3.1.2 Virtual Hosts
-        if (contextKey == null)
+        if (protocolSession.getContextKey() == null)
         {
-            contextKey = generateClientID();
+            protocolSession.setContextKey(generateClientID());
         }
-        protocolSession.setContextKey(contextKey);
+
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
             (byte)8, (byte)0,	// AMQP version (major, minor)
-            contextKey);	// knownHosts
+            body.virtualHost);	// knownHosts
         stateManager.changeState(AMQState.CONNECTION_OPEN);
         protocolSession.writeFrame(response);
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Fri Jan 19 02:35:21 2007
@@ -76,7 +76,7 @@
             {
                 if(body.passive && ((body.type == null) || body.type.length() ==0))
                 {
-                    throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());                    
+                    throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange);                    
                 }
                 else
                 {
@@ -89,7 +89,7 @@
                     }
                     catch(AMQUnknownExchangeType e)
                     {
-                        throw new AMQConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),e);
+                        throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,e);
                     }
                 }
             }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri Jan 19 02:35:21 2007
@@ -77,22 +77,19 @@
         {
             body.queue = createName();
         }
+
+        AMQQueue queue = null;
         //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
 
         synchronized (queueRegistry)
         {
-            AMQQueue queue;
+
             if (((queue = queueRegistry.getQueue(body.queue)) == null) )
             {
                 if(body.passive)
                 {
                     String msg = "Queue: " + body.queue + " not found.";
-                    throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(),
-                                                                                  msg,
-                                                                                  body.getClazz(),
-                                                                                  body.getMethod(),
-                                                                                  (byte)8,
-                                                                                  (byte)0 	);
+                    throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg );
 
                 }
                 else
@@ -112,9 +109,16 @@
                     }
                 }
             }
+            else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner()))
+            {
+                // todo - constant
+                throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");        
+
+            }
             //set this as the default queue on the channel:
             protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
         }
+
         if (!body.nowait)
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -122,8 +126,8 @@
             // Be aware of possible changes to parameter order as versions change.
             AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
                 (byte)8, (byte)0,	// AMQP version (major, minor)
-                0L, // consumerCount
-                0L, // messageCount
+                queue.getConsumerCount(), // consumerCount
+                queue.getMessageCount(), // messageCount
                 body.queue); // queue
             _log.info("Queue " + body.queue + " declared successfully");
             protocolSession.writeFrame(response);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri Jan 19 02:35:21 2007
@@ -34,6 +34,7 @@
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.protocol.AMQConstant;
 
 public class QueueDeleteHandler  implements StateAwareMethodListener<QueueDeleteBody>
@@ -84,15 +85,12 @@
         {
             if(body.ifEmpty && !queue.isEmpty())
             {
-                AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is not empty.");
-                // TODO - Error code
-                session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg	));
+                throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." );
             }
             else if(body.ifUnused && !queue.isUnused())
-            {
-                AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is still used.");
+            {                
                 // TODO - Error code
-                session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg	));
+                throw body.getChannelException(406, "Queue: " + body.queue + " is still used." );
 
             }
             else

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Jan 19 02:35:21 2007
@@ -57,7 +57,7 @@
             protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
-            channel.resend(protocolSession);
+            channel.resend(protocolSession, false);
         }catch(AMQException e){
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Jan 19 02:35:21 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -57,6 +58,9 @@
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
 
+    private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+
+
     private final IoSession _minaProtocolSession;
 
     private AMQShortString _contextKey;
@@ -218,31 +222,36 @@
                                                                                     (AMQMethodBody) frame.bodyFrame);
         try
         {
-            boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
-
-            if(!_frameListeners.isEmpty())
+            try
             {
-                for (AMQMethodListener listener : _frameListeners)
+                boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
+
+                if(!_frameListeners.isEmpty())
                 {
-                    wasAnyoneInterested = listener.methodReceived(evt) ||
-                                          wasAnyoneInterested;
+                    for (AMQMethodListener listener : _frameListeners)
+                    {
+                        wasAnyoneInterested = listener.methodReceived(evt) ||
+                                              wasAnyoneInterested;
+                    }
+                }
+                if (!wasAnyoneInterested)
+                {
+                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
                 }
             }
-            if (!wasAnyoneInterested)
+            catch (AMQChannelException e)
             {
-                throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+                _logger.error("Closing channel due to: " + e.getMessage());
+                writeFrame(e.getCloseFrame(frame.channel));
+                closeChannel(frame.channel);
+            }
+            catch (AMQConnectionException e)
+            {
+                _logger.error("Closing connection due to: " + e.getMessage());
+                closeSession();
+                writeFrame(e.getCloseFrame(frame.channel));
             }
         }
-        catch (AMQChannelException e)
-        {
-            _logger.error("Closing channel due to: " + e.getMessage());
-            writeFrame(e.getCloseFrame(frame.channel));
-        }
-        catch (AMQConnectionException e)
-        {
-            _logger.error("Closing connection due to: " + e.getMessage());
-            writeFrame(e.getCloseFrame(frame.channel));
-        }        
         catch (Exception e)
         {
             _stateManager.error(e);
@@ -516,6 +525,10 @@
     public void setClientProperties(FieldTable clientProperties)
     {
         _clientProperties = clientProperties;
+        if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
+        {
+            setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
+        }
     }
 
     /**
@@ -536,5 +549,11 @@
     public boolean amqpVersionEquals(byte major, byte minor)
     {
         return _major == major && _minor == minor;
+    }
+
+
+    public Object getClientIdentifier()
+    {
+        return _minaProtocolSession.getRemoteAddress();    
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Jan 19 02:35:21 2007
@@ -124,4 +124,6 @@
     FieldTable getClientProperties();
 
     void setClientProperties(FieldTable clientProperties);
+
+    Object getClientIdentifier();
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java Fri Jan 19 02:35:21 2007
@@ -33,6 +33,8 @@
         define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
         define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
         define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+
+        registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
     }
 
     private void define(ExchangeRegistry r, ExchangeFactory f,

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Jan 19 02:35:21 2007
@@ -43,8 +43,6 @@
 {
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
-    public static final String JMS_MESSAGE = "jms.message";
-
     /**
      * Used in clustering
      */
@@ -75,6 +73,8 @@
 
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
+
+
     /**
      * Used to iterate through all the body frames associated with this message. Will not
      * keep all the data in memory therefore is memory-efficient.
@@ -550,6 +550,7 @@
         {
             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
                                                                              contentHeader);
+
             protocolSession.writeFrame(compositeBlock);
         }
         else
@@ -582,6 +583,50 @@
 
     }
 
+    public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+        ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag, queueSize);
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      getContentHeaderBody());
+
+        final int bodyCount = _messageHandle.getBodyCount(_messageId);
+        if(bodyCount == 0)
+        {
+            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                             contentHeader);
+            protocolSession.writeFrame(compositeBlock);
+        }
+        else
+        {
+
+
+            //
+            // Optimise the case where we have a single content body. In that case we create a composite block
+            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+            //
+            ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+
+            AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+            protocolSession.writeFrame(compositeBlock);
+
+            //
+            // Now start writing out the other content bodies
+            //
+            for(int i = 1; i < bodyCount; i++)
+            {
+                cb = _messageHandle.getContentBody(_messageId, i);
+                protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+            }
+
+
+        }
+
+
+    }
+
+
     private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
@@ -595,6 +640,21 @@
         return buf;
     }
 
+    private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize)
+            throws AMQException
+    {
+        BasicPublishBody pb = getPublishBody();
+        AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte) 8, (byte) 0,
+                                                                deliveryTag, pb.exchange,
+                                                                queueSize,
+                                                                _messageHandle.isRedelivered(),
+                                                                pb.routingKey);
+        ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+        getOkFrame.writePayload(buf);
+        buf.flip();
+        return buf;
+    }
+
     private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {
         AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange,
@@ -642,6 +702,24 @@
             protocolSession.writeFrame(bodyFrameIterator.next());
         }
     }
+
+
+    public long getSize()
+    {
+        try
+        {
+            long size = getContentHeaderBody().bodySize;
+
+            return size;
+        }
+        catch (AMQException e)
+        {
+            _log.error(e);
+            return 0;
+        }
+
+    }    
+
 
     public String toString()
     {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Jan 19 02:35:21 2007
@@ -29,11 +29,14 @@
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.AMQChannel;
 
 import javax.management.JMException;
 import java.text.MessageFormat;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
@@ -41,6 +44,30 @@
  */
 public class AMQQueue implements Managable, Comparable
 {
+
+    public static final class ExistingExclusiveSubscription extends AMQException
+    {
+
+        public ExistingExclusiveSubscription()
+        {
+            super("");
+        }
+    }
+
+    public static final class ExistingSubscriptionPreventsExclusive extends AMQException
+    {
+
+        public ExistingSubscriptionPreventsExclusive()
+        {
+            super("");
+        }
+    }
+
+    private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = new ExistingExclusiveSubscription();
+    private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
+
+
+
     private static final Logger _logger = Logger.getLogger(AMQQueue.class);
 
     private final AMQShortString _name;
@@ -64,6 +91,11 @@
 
     private final SubscriptionFactory _subscriptionFactory;
 
+    private final AtomicInteger _subscriberCount = new AtomicInteger();
+
+    private final AtomicBoolean _isExclusive = new AtomicBoolean();
+
+
     /**
      * Manages message delivery.
      */
@@ -187,31 +219,7 @@
         _managedObject.register();
         _subscribers = subscribers;
         _subscriptionFactory = subscriptionFactory;
-
-        //fixme - Make this configurable via the broker config.xml
-        if (System.getProperties().getProperty("deliverymanager") != null)
-        {
-            if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
-            {
-                _logger.info("Using ConcurrentSelectorDeliveryManager");
-                _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
-            }
-            else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
-            {
-                _logger.info("Using ConcurrentDeliveryManager");
-                _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
-            }
-            else
-            {
-                _logger.info("Using SynchronizedDeliveryManager");
-                _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
-            }
-        }
-        else
-        {
-            _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
-            _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
-        }
+		_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
     }
 
     private AMQQueueMBean createMBean() throws AMQException
@@ -352,9 +360,9 @@
     /**
      * removes all the messages from the queue.
      */
-    public void clearQueue(StoreContext storeContext) throws AMQException
+    public long clearQueue(StoreContext storeContext) throws AMQException
     {
-        _deliveryMgr.clearAllMessages(storeContext);
+        return _deliveryMgr.clearAllMessages(storeContext);
     }
 
     public void bind(AMQShortString routingKey, Exchange exchange)
@@ -362,14 +370,30 @@
         _bindings.addBinding(routingKey, exchange);
     }
 
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters) throws AMQException
-    {
-        registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
-    }
 
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
+                                        FieldTable filters, boolean noLocal, boolean exclusive)
             throws AMQException
     {
+        if(incrementSubscriberCount() > 1)
+        {
+            if(isExclusive())
+            {
+                decrementSubscriberCount();
+                throw EXISTING_EXCLUSIVE;
+            }
+            else if(exclusive)
+            {
+                decrementSubscriberCount();
+                throw EXISTING_SUBSCRIPTION;
+            }
+
+        }
+        else if(exclusive)
+        {
+            setExclusive(true);
+        }
+
         debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
 
         Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
@@ -385,6 +409,28 @@
         _subscribers.addSubscriber(subscription);
     }
 
+
+    private boolean isExclusive()
+    {
+        return _isExclusive.get();
+    }
+
+    private void setExclusive(boolean exclusive)
+    {
+        _isExclusive.set(exclusive);
+    }
+
+    private int incrementSubscriberCount()
+    {
+        return _subscriberCount.incrementAndGet();
+    }
+
+    private int decrementSubscriberCount()
+    {
+        return _subscriberCount.decrementAndGet();
+    }
+
+
     public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
     {
         debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
@@ -400,6 +446,10 @@
                                    " and protocol session key " + ps.getKey() + " not registered with queue " + this);
         }
 
+        setExclusive(false);
+        decrementSubscriberCount();
+
+
         // if we are eligible for auto deletion, unregister from the queue registry
         if (_autoDelete && _subscribers.isEmpty())
         {
@@ -454,6 +504,23 @@
         delete();
     }
 
+    public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
+    {
+        _deliveryMgr.deliver(storeContext, getName(), msg);
+        try
+        {
+            msg.checkDeliveredToConsumer();
+            updateReceivedMessageCount(msg);
+        }
+        catch (NoConsumersException e)
+        {
+            // as this message will be returned, it should be removed
+            // from the queue:
+            dequeue(storeContext, msg);
+        }
+    }
+
+
     public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
     {
         _deliveryMgr.deliver(storeContext, getName(), msg);
@@ -547,4 +614,12 @@
             _logger.debug(MessageFormat.format(msg, args));
         }
     }
+
+    public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
+    {
+        return _deliveryMgr.performGet(session, channel, acks);
+    }
+
+    
+
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Jan 19 02:35:21 2007
@@ -28,6 +28,8 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -52,6 +54,9 @@
      * Holds any queued messages
      */
     private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+    private final ReentrantLock _messageAccessLock = new ReentrantLock();
+
     //private int _messageCount;
     /**
      * Ensures that only one asynchronous task is running for this manager at
@@ -169,6 +174,56 @@
         }
     }
 
+    public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
+    {
+        AMQMessage msg = getNextMessage();
+        if(msg == null)
+        {
+            return false;
+        }
+        else
+        {
+
+            try
+            {
+                // if we do not need to wait for client acknowledgements
+                // we can decrement the reference count immediately.
+
+                // By doing this _before_ the send we ensure that it
+                // doesn't get sent if it can't be dequeued, preventing
+                // duplicate delivery on recovery.
+
+                // The send may of course still fail, in which case, as
+                // the message is unacked, it will be lost.
+                if (!acks)
+                {
+                    if (_log.isDebugEnabled())
+                    {
+                        _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+                    }
+                    _queue.dequeue(channel.getStoreContext(), msg);
+                }
+                synchronized(channel)
+                {
+                    long deliveryTag = channel.getNextDeliveryTag();
+
+                    if (acks)
+                    {
+                        channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
+                    }
+
+                    msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount());
+                }
+            }
+            finally
+            {
+                msg.setDeliveredToConsumer();
+            }
+            return true;
+
+        }
+    }
+
     public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
     {
         AMQMessage msg = poll();
@@ -178,22 +233,35 @@
         }
     }
 
-    public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
+    public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException
     {
+        long count = 0;
         AMQMessage msg = poll();
         while (msg != null)
         {
             msg.dequeue(storeContext, _queue);
+            count++;
             msg = poll();
         }
+        return count;
+    }
+
+    public synchronized AMQMessage getNextMessage() throws AMQException
+    {
+        return getNextMessage(_messages);
     }
 
 
-    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+    private AMQMessage getNextMessage(Queue<AMQMessage> messages)
+    {
+        return getNextMessage(messages, false);
+    }
+
+    private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing)
     {
         AMQMessage message = messages.peek();
 
-        while (message != null && (sub.isBrowser() || message.taken()))
+        while (message != null && (browsing || message.taken()))
         {
             //remove the already taken message
             messages.poll();
@@ -208,7 +276,7 @@
         AMQMessage message = null;
         try
         {
-            message = getNextMessage(messageQueue, sub);
+            message = getNextMessage(messageQueue, sub.isBrowser());
 
             // message will be null if we have no messages in the messageQueue.
             if (message == null)
@@ -287,6 +355,7 @@
         {
             _log.debug(id() + "deliver :" + msg);
         }
+        msg.release();
 
         //Check if we have someone to deliver the message to.
         _lock.lock();

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Jan 19 02:35:21 2007
@@ -23,6 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 
 import java.util.concurrent.Executor;
 import java.util.List;
@@ -72,9 +74,11 @@
 
     void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
 
-    void clearAllMessages(StoreContext storeContext) throws AMQException;
+    long clearAllMessages(StoreContext storeContext) throws AMQException;
 
     List<AMQMessage> getMessages();
 
     void populatePreDeliveryQueue(Subscription subscription);
+
+    boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Fri Jan 19 02:35:21 2007
@@ -45,4 +45,6 @@
     void close();
 
     boolean isBrowser();
+
+    boolean wouldSuspend(AMQMessage msg);
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Jan 19 02:35:21 2007
@@ -66,6 +66,7 @@
     private final boolean _isBrowser;
     private final Boolean _autoClose;
     private boolean _closed = false;
+    private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
 
     public static class Factory implements SubscriptionFactory
     {
@@ -300,37 +301,54 @@
     {
         if (_noLocal)
         {
+            boolean isLocal;
             // We don't want local messages so check to see if message is one we sent
-            Object localInstance = protocolSession.getClientProperties().getObject(ClientProperties.instance.toString());
-            Object msgInstance = msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString());
+            Object localInstance;
+            Object msgInstance;
 
-            if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+            if((protocolSession.getClientProperties() != null) &&
+                 (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
             {
-                if (_logger.isTraceEnabled())
+                if((msg.getPublisher().getClientProperties() != null) &&
+                     (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
                 {
-                    _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
-                                  System.identityHashCode(msg) + ")");
+                    if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+                    {
+                        if (_logger.isTraceEnabled())
+                        {
+                            _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+                                          System.identityHashCode(msg) + ")");
+                        }
+                        return false;
+                    }
                 }
-                return false;
             }
-            else // if not then filter the message.
+            else
             {
-                if (_logger.isTraceEnabled())
+                localInstance = protocolSession.getClientIdentifier();
+                msgInstance = msg.getPublisher().getClientIdentifier();
+                if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
                 {
-                    _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
-                                  ") but not ours so filtering");
+                    if (_logger.isTraceEnabled())
+                    {
+                        _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+                                      System.identityHashCode(msg) + ")");
+                    }
+                    return false;
                 }
-                return checkFilters(msg);
+
             }
+
+
         }
-        else
+
+
+        if (_logger.isTraceEnabled())
         {
-            if (_logger.isTraceEnabled())
-            {
-                _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
-            }
-            return checkFilters(msg);
+            _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
         }
+        return checkFilters(msg);
+
     }
 
     private boolean checkFilters(AMQMessage msg)
@@ -391,6 +409,11 @@
     public boolean isBrowser()
     {
         return _isBrowser;
+    }
+
+    public boolean wouldSuspend(AMQMessage msg)
+    {
+        return channel.wouldSuspend(msg);
     }
 
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Fri Jan 19 02:35:21 2007
@@ -137,7 +137,7 @@
             ++_currentSubscriber;
             subscriberScanned();
 
-            if (!subscription.isSuspended())
+            if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
             {
                 if (subscription.hasInterest(msg))
                 {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Jan 19 02:35:21 2007
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.state;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.handler.*;
@@ -28,6 +30,7 @@
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.log4j.Logger;
 
 import java.util.HashMap;
@@ -118,12 +121,14 @@
         frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
         frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
         frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());
+        frame2handlerMap.put(BasicGetBody.class, BasicGetMethodHandler.getInstance());
         frame2handlerMap.put(BasicCancelBody.class, BasicCancelMethodHandler.getInstance());
         frame2handlerMap.put(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
         frame2handlerMap.put(BasicQosBody.class, BasicQosHandler.getInstance());
         frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance());
         frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance());
         frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance());
+        frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance());
         frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
         frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance());
         frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance());
@@ -168,10 +173,24 @@
         StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
         if (handler != null)
         {
+
+            checkChannel(evt, _protocolSession);
+
             handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
             return true;
         }
         return false;
+    }
+
+    private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
+            throws AMQException
+    {
+        if(evt.getChannelId() != 0
+                && !(evt.getMethod() instanceof ChannelOpenBody)
+                && protocolSession.getChannel(evt.getChannelId()) == null)
+        {
+            throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(),"No such channel: " + evt.getChannelId());
+        }
     }
 
     protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Jan 19 02:35:21 2007
@@ -269,14 +269,15 @@
 
     private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
     {
+        byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
+        Destination dest = AMQDestination.createDestination(url);
+        jmsMsg.setJMSDestination(dest);
+
         if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
         {
             _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
-            byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
-            Destination dest = AMQDestination.createDestination(url);
-            jmsMsg.setJMSDestination(dest);
-
         }
+
         _session.setInRecovery(false);
     }
 

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Fri Jan 19 02:35:21 2007
@@ -134,11 +134,12 @@
 
     public boolean isBrowser()
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return false;
     }
 
-    public void sendNextMessage(AMQQueue queue)
+    public boolean wouldSuspend(AMQMessage msg)
     {
-
+        return _suspended;
     }
+
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java Fri Jan 19 02:35:21 2007
@@ -32,6 +32,7 @@
     /* AMQP version for which exception ocurred */
     private final byte major;
     private final byte minor;
+    boolean _closeConnetion;
 
     public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
     {
@@ -51,9 +52,12 @@
         this.minor = minor;
     }
 
+
+
     public AMQFrame getCloseFrame(int channel)
     {
         return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage()));
     }
+
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Fri Jan 19 02:35:21 2007
@@ -22,6 +22,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
 
 public abstract class AMQMethodBody extends AMQBody
 {
@@ -101,4 +102,17 @@
     {
         return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause);
     }
+
+    public AMQConnectionException getConnectionException(int code, String message)
+    {
+        return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
+    }
+
+
+
+    public AMQConnectionException getConnectionException(int code, String message, Throwable cause)
+    {
+        return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause);
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Jan 19 02:35:21 2007
@@ -78,7 +78,7 @@
         _protocolSession = new MockProtocolSession(_messageStore);
         _protocolSession.addChannel(_channel);
 
-        _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null);
+        _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false);
         assertTrue(_queueMBean.getActiveConsumerCount() == 1);
 
         SubscriptionSet _subscribers = (SubscriptionSet) mgr;

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java Fri Jan 19 02:35:21 2007
@@ -53,7 +53,7 @@
 
     public ConcurrencyTest() throws Exception
     {
-        _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
+        _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
                                                                           new DefaultQueueRegistry()));
     }
 

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Fri Jan 19 02:35:21 2007
@@ -172,8 +172,6 @@
     public static junit.framework.Test suite()
     {
         TestSuite suite = new TestSuite();
-        suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
-        suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
         return suite;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Fri Jan 19 02:35:21 2007
@@ -132,4 +132,9 @@
     public void setClientProperties(FieldTable clientProperties)
     {
     }
+	
+	public Object getClientIdentifier()
+	{
+		return null;
+	}
 }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Fri Jan 19 02:35:21 2007
@@ -67,6 +67,12 @@
         return isSuspended;
     }
 
+	public boolean wouldSuspend(AMQMessage msg)
+    {
+        return isSuspended;
+    }
+
+	
     public void queueDeleted(AMQQueue queue)
     {
     }