You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/02/16 00:23:52 UTC

svn commit: r508235 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/que...

Author: rgodfrey
Date: Thu Feb 15 15:23:48 2007
New Revision: 508235

URL: http://svn.apache.org/viewvc?view=rev&rev=508235
Log:
QPID-366 : Reference counting not being decremented correctly and other persistence issues 

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Feb 15 15:23:48 2007
@@ -43,6 +43,7 @@
 import org.apache.qpid.server.exchange.MessageRouter;
 import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -112,7 +113,7 @@
      * A context used by the message store enabling it to track context for a given channel even across
      * thread boundaries
      */
-    private final StoreContext _storeContext = new StoreContext();
+    private final StoreContext _storeContext;
 
     private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
 
@@ -120,12 +121,16 @@
 
     private Set<Long> _browsedAcks = new HashSet<Long>();
 
+    private final AMQProtocolSession _session;
 
 
-    public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
+
+    public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
             throws AMQException
     {
+        _session = session;
         _channelId = channelId;
+        _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
         _prefetch_HighWaterMark = DEFAULT_PREFETCH;
         _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
         _messageStore = messageStore;
@@ -338,7 +343,8 @@
         _txnContext.rollback();
         unsubscribeAllConsumers(session);
         requeue();
-		_txnContext.commit();
+        _txnContext.commit();
+
     }
 
     private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
@@ -386,8 +392,10 @@
                 _txnContext.deliver(unacked.message, unacked.queue);
             }
         }
+
     }
 
+
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      */
@@ -403,7 +411,7 @@
                 AMQShortString consumerTag = message.consumerTag;
                 AMQMessage msg = message.message;
                 msg.setRedelivered(true);
-                if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+                if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended())
                 {
                     msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
                 }
@@ -417,6 +425,7 @@
                         msgToRequeue.add(message);                         
                     }
                 }
+                
                 // false means continue processing
                 return false;
             }
@@ -430,6 +439,7 @@
         {
             _txnContext.deliver(message.message, message.queue);
             _unacknowledgedMessageMap.remove(message.deliveryTag);
+            message.message.decrementReference(_storeContext);
         }
     }
 
@@ -559,6 +569,8 @@
     public void rollback() throws AMQException
     {
         _txnContext.rollback();
+
+
     }
 
     public String toString()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Thu Feb 15 15:23:48 2007
@@ -100,6 +100,7 @@
         //make persistent changes, i.e. dequeue and decrementReference
         for (UnacknowledgedMessage msg : _unacked)
         {
+            msg.restoreTransientMessageData();
             msg.discard(storeContext);
         }
     }
@@ -112,6 +113,7 @@
         //in memory (persistent changes will be rolled back by store)
         for (UnacknowledgedMessage msg : _unacked)
         {
+            msg.clearTransientMessageData();
             msg.message.incrementReference();
         }
     }
@@ -120,6 +122,11 @@
     {
         //remove the unacked messages from the channels map
         _map.remove(_unacked);
+        for (UnacknowledgedMessage msg : _unacked)
+        {
+            msg.clearTransientMessageData();        
+        }
+
     }
 
     public void rollback(StoreContext storeContext)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java Thu Feb 15 15:23:48 2007
@@ -50,5 +50,15 @@
         }
         message.decrementReference(storeContext);
     }
+
+    public void restoreTransientMessageData() throws AMQException
+    {
+        message.restoreTransientMessageData();
+    }
+
+    public void clearTransientMessageData()
+    {
+        message.clearTransientMessageData();
+    }
 }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Thu Feb 15 15:23:48 2007
@@ -49,7 +49,7 @@
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
 
-        final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(),
+        final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
                                                   virtualHost.getExchangeRegistry());
         session.addChannel(channel);
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Feb 15 15:23:48 2007
@@ -20,10 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -111,7 +108,7 @@
         {
             try
             {
-                return _index < _messageHandle.getBodyCount(_messageId) - 1;
+                return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
             }
             catch (AMQException e)
             {
@@ -124,7 +121,7 @@
         {
             try
             {
-                ContentBody cb = _messageHandle.getContentBody(_messageId, ++_index);
+                ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
                 return ContentBody.createAMQFrame(_channel, cb);
             }
             catch (AMQException e)
@@ -141,6 +138,11 @@
         }
     }
 
+    private StoreContext getStoreContext()
+    {
+        return _txnContext.getStoreContext();
+    }
+
     private class BodyContentIterator implements Iterator<ContentBody>
     {
 
@@ -150,7 +152,7 @@
         {
             try
             {
-                return _index < _messageHandle.getBodyCount(_messageId) - 1;
+                return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
             }
             catch (AMQException e)
             {
@@ -163,7 +165,7 @@
         {
             try
             {
-                return _messageHandle.getContentBody(_messageId, ++_index);
+                return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
             }
             catch (AMQException e)
             {
@@ -201,10 +203,11 @@
      * @param factory
      * @throws AMQException
      */
-    public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
+    public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
     {
         _messageId = messageId;
         _messageHandle = factory.createMessageHandle(messageId, store, true);
+        _txnContext = txnConext;
         _transientMessageData = null;
     }
 
@@ -276,7 +279,7 @@
         }
         else
         {
-            return _messageHandle.getContentHeaderBody(_messageId);
+            return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId);
         }
     }
 
@@ -342,14 +345,16 @@
         _referenceCount.incrementAndGet();
         if (_log.isDebugEnabled())
         {
-            _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount);
+
+            _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + "   " +  Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+
         }
     }
 
     /**
      * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
      * message store.
-     *
+     *                                                                                                            
      * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
      *                                 failed
      */
@@ -365,7 +370,9 @@
             {
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug("Ref count on message " + _messageId + " is zero; removing message");
+                    _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+
+
                 }
 
                 // must check if the handle is null since there may be cases where we decide to throw away a message
@@ -386,7 +393,7 @@
         {
             if (_log.isDebugEnabled())
             {
-                _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId);
+                _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" +  Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
                 if (_referenceCount.get() < 0)
                 {
                     Thread.dumpStack();
@@ -475,7 +482,7 @@
         }
         else
         {
-            return _messageHandle.isPersistent(_messageId);
+            return _messageHandle.isPersistent(getStoreContext(),_messageId);
         }
     }
 
@@ -504,7 +511,7 @@
         }
         else
         {
-            pb = _messageHandle.getPublishBody(_messageId);
+            pb = _messageHandle.getPublishBody(getStoreContext(),_messageId);
         }
         return pb;
     }
@@ -541,7 +548,7 @@
         List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
         if (_log.isDebugEnabled())
         {
-            _log.debug("Delivering message " + _messageId);
+            _log.debug("Delivering message " + _messageId + " to " + destinationQueues);
         }
         try
         {
@@ -575,7 +582,7 @@
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       getContentHeaderBody());
 
-        final int bodyCount = _messageHandle.getBodyCount(_messageId);
+        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
         if(bodyCount == 0)
         {
             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -591,7 +598,7 @@
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+            ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
 
             AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -603,7 +610,7 @@
             //
             for(int i = 1; i < bodyCount; i++)
             {
-                cb = _messageHandle.getContentBody(_messageId, i);
+                cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
                 protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
             }
 
@@ -619,7 +626,7 @@
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       getContentHeaderBody());
 
-        final int bodyCount = _messageHandle.getBodyCount(_messageId);
+        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
         if(bodyCount == 0)
         {
             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -634,7 +641,7 @@
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+            ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
 
             AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -646,7 +653,7 @@
             //
             for(int i = 1; i < bodyCount; i++)
             {
-                cb = _messageHandle.getContentBody(_messageId, i);
+                cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
                 protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
             }
 
@@ -749,11 +756,28 @@
         }
         catch (AMQException e)
         {
-            _log.error(e);
+            _log.error(e.toString(),e);
             return 0;
         }
 
     }    
+
+
+
+    public void restoreTransientMessageData() throws AMQException
+    {
+        TransientMessageData transientMessageData = new TransientMessageData();
+        transientMessageData.setPublishBody(getPublishBody());
+        transientMessageData.setContentHeaderBody(getContentHeaderBody());
+        transientMessageData.addBodyLength(getContentHeaderBody().getSize());
+        _transientMessageData = transientMessageData; 
+    }
+
+
+    public void clearTransientMessageData()
+    {
+        _transientMessageData = null;
+    }
 
 
     public String toString()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Thu Feb 15 15:23:48 2007
@@ -35,17 +35,17 @@
  */
 public interface AMQMessageHandle
 {
-    ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException;
+    ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException;
 
     /**
      * @return the number of body frames associated with this message
      */
-    int getBodyCount(Long messageId) throws AMQException;
+    int getBodyCount(StoreContext context, Long messageId) throws AMQException;
 
     /**
      * @return the size of the body
      */
-    long getBodySize(Long messageId) throws AMQException;
+    long getBodySize(StoreContext context, Long messageId) throws AMQException;
 
     /**
      * Get a particular content body
@@ -53,17 +53,17 @@
      * @return a content body
      * @throws IllegalArgumentException if the index is invalid
      */
-    ContentBody getContentBody(Long messageId, int index) throws IllegalArgumentException, AMQException;
+    ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
 
     void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException;
 
-    BasicPublishBody getPublishBody(Long messageId) throws AMQException;
+    BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException;
 
     boolean isRedelivered();
 
     void setRedelivered(boolean redelivered);
 
-    boolean isPersistent(Long messageId) throws AMQException;
+    boolean isPersistent(StoreContext context, Long messageId) throws AMQException;
 
     void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
                                         ContentHeaderBody contentHeaderBody)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Thu Feb 15 15:23:48 2007
@@ -49,22 +49,22 @@
     {
     }
 
-    public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
+    public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
     {
         return _contentHeaderBody;
     }
 
-    public int getBodyCount(Long messageId)
+    public int getBodyCount(StoreContext context, Long messageId)
     {
         return _contentBodies.size();
     }
 
-    public long getBodySize(Long messageId) throws AMQException
+    public long getBodySize(StoreContext context, Long messageId) throws AMQException
     {
-        return getContentHeaderBody(messageId).bodySize;
+        return getContentHeaderBody(context, messageId).bodySize;
     }
 
-    public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
+    public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
@@ -80,7 +80,7 @@
         _contentBodies.add(contentBody);
     }
 
-    public BasicPublishBody getPublishBody(Long messageId) throws AMQException
+    public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
     {
         return _publishBody;
     }
@@ -96,10 +96,10 @@
         _redelivered = redelivered;
     }
 
-    public boolean isPersistent(Long messageId) throws AMQException
+    public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
     {
         //todo remove literal values to a constant file such as AMQConstants in common
-        ContentHeaderBody chb = getContentHeaderBody(messageId);
+        ContentHeaderBody chb = getContentHeaderBody(context, messageId);
         return chb.properties instanceof BasicContentHeaderProperties &&
                ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Feb 15 15:23:48 2007
@@ -267,9 +267,11 @@
                 if (_acks)
                 {
                     channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+                    msg.decrementReference(storeContext);
                 }
 
                 msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+
             }
         }
         finally

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Thu Feb 15 15:23:48 2007
@@ -56,21 +56,21 @@
         _messageStore = messageStore;
     }
 
-    public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
+    public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
     {
         ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
         if (chb == null)
         {
-            MessageMetaData mmd = loadMessageMetaData(messageId);
+            MessageMetaData mmd = loadMessageMetaData(context, messageId);
             chb = mmd.getContentHeaderBody();
         }
         return chb;
     }
 
-    private MessageMetaData loadMessageMetaData(Long messageId)
+    private MessageMetaData loadMessageMetaData(StoreContext context, Long messageId)
             throws AMQException
     {
-        MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+        MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
         populateFromMessageMetaData(mmd);
         return mmd;
     }
@@ -82,11 +82,11 @@
         _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
     }
 
-    public int getBodyCount(Long messageId) throws AMQException
+    public int getBodyCount(StoreContext context, Long messageId) throws AMQException
     {
         if (_contentBodies == null)
         {
-            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+            MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
             int chunkCount = mmd.getContentChunkCount();
             _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
             for (int i = 0; i < chunkCount; i++)
@@ -97,12 +97,12 @@
         return _contentBodies.size();
     }
 
-    public long getBodySize(Long messageId) throws AMQException
+    public long getBodySize(StoreContext context, Long messageId) throws AMQException
     {
-        return getContentHeaderBody(messageId).bodySize;
+        return getContentHeaderBody(context, messageId).bodySize;
     }
 
-    public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
+    public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
@@ -113,7 +113,7 @@
         ContentBody cb = wr.get();
         if (cb == null)
         {
-            cb = _messageStore.getContentBodyChunk(messageId, index);
+            cb = _messageStore.getContentBodyChunk(context, messageId, index);
             _contentBodies.set(index, new WeakReference<ContentBody>(cb));
         }
         return cb;
@@ -145,12 +145,12 @@
         _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
     }
 
-    public BasicPublishBody getPublishBody(Long messageId) throws AMQException
+    public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
     {
         BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null);
         if (bpb == null)
         {
-            MessageMetaData mmd = loadMessageMetaData(messageId);
+            MessageMetaData mmd = loadMessageMetaData(context, messageId);
 
             bpb = mmd.getPublishBody();
         }
@@ -167,10 +167,10 @@
         _redelivered = redelivered;
     }
 
-    public boolean isPersistent(Long messageId) throws AMQException
+    public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
     {
         //todo remove literal values to a constant file such as AMQConstants in common
-        ContentHeaderBody chb = getContentHeaderBody(messageId);
+        ContentHeaderBody chb = getContentHeaderBody(context, messageId);
         return chb.properties instanceof BasicContentHeaderProperties &&
                ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Feb 15 15:23:48 2007
@@ -174,12 +174,12 @@
         _metaDataMap.put(messageId, messageMetaData);
     }
 
-    public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
+    public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException
     {
         return _metaDataMap.get(messageId);
     }
 
-    public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
+    public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
     {
         List<ContentBody> bodyList = _contentBodyMap.get(messageId);
         return bodyList.get(index);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Thu Feb 15 15:23:48 2007
@@ -84,8 +84,8 @@
 
     void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
 
-    MessageMetaData getMessageMetaData(Long messageId) throws AMQException;
+    MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
 
-    ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException;
+    ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
 
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Thu Feb 15 15:23:48 2007
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.server.store;
 
+import org.apache.log4j.Logger;
+
+
 /**
  * A context that the store can use to associate with a transactional context. For example, it could store
  * some kind of txn id.
@@ -28,8 +31,22 @@
  */
 public class StoreContext
 {
+
+    private static final Logger _logger = Logger.getLogger(StoreContext.class);
+
+    private String _name;
     private Object _payload;
 
+    public StoreContext()
+    {
+        _name = super.toString();
+    }
+
+    public StoreContext(String name)
+    {
+        _name = name;
+    }
+
     public Object getPayload()
     {
         return _payload;
@@ -37,6 +54,7 @@
 
     public void setPayload(Object payload)
     {
+        _logger.debug("["+_name+"] Setting payload: " + payload);
         _payload = payload;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Thu Feb 15 15:23:48 2007
@@ -168,7 +168,7 @@
         {
             if (_log.isDebugEnabled())
             {
-                _log.debug("Starting transaction on message store");
+                _log.debug("Starting transaction on message store: " + this);
             }
             _messageStore.beginTran(_storeContext);
             _inTran = true;
@@ -179,7 +179,7 @@
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug("Committing transactional context");
+            _log.debug("Committing transactional context: " + this);
         }
         if (_ackOp != null)
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Thu Feb 15 15:23:48 2007
@@ -75,7 +75,7 @@
                         }
                         else
                         {
-                            URLHelper.parseError(0, transport.length(), "Unknown transport", url);
+                            throw URLHelper.parseError(0, transport.length(), "Unknown transport", url);
                         }
                     }
                 }
@@ -89,7 +89,7 @@
 
             if (transport == null)
             {
-                URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
+                throw URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
                                          " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
             }
 
@@ -144,7 +144,7 @@
                     }
                     else
                     {
-                        URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
+                        throw URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
                                              "Illegal character in port number", connection.toString());
                     }
 
@@ -172,7 +172,7 @@
                 throw(URLSyntaxException) uris;
             }
 
-            URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+            throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Thu Feb 15 15:23:48 2007
@@ -68,7 +68,7 @@
                 String uid = AMQConnectionFactory.getUniqueClientID();
                 if (uid == null)
                 {
-                    URLHelper.parseError(-1, "Client Name not specified", fullURL);
+                    throw URLHelper.parseError(-1, "Client Name not specified", fullURL);
                 }
                 else
                 {
@@ -106,7 +106,7 @@
 
             if (userInfo == null)
             {
-                URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
+                throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
                         "User information not found on url", fullURL);
             }
             else
@@ -126,11 +126,11 @@
                 int testIndex = start + authLength;
                 if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
                 {
-                    URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+                    throw URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
                 }
                 else
                 {
-                    URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+                    throw URLHelper.parseError(-1, "Virtual host not specified", fullURL);
                 }
 
             }
@@ -155,17 +155,17 @@
 
             if (slash == -1)
             {
-                URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+                throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
             }
             else
             {
                 if (slash != 0 && fullURL.charAt(slash - 1) == ':')
                 {
-                    URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+                    throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
                 }
                 else
                 {
-                    URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+                    throw URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
                 }
             }
 
@@ -180,7 +180,7 @@
 
         if (colonIndex == -1)
         {
-            URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
+            throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
                     "Null password in user information not allowed.", _url);
         }
         else

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Feb 15 15:23:48 2007
@@ -76,7 +76,7 @@
         _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
         _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
         _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
-        _queueName = new AMQShortString(binding.getQueueName());
+        _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName());
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Thu Feb 15 15:23:48 2007
@@ -28,6 +28,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.CustomJMSXProperty;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
@@ -42,7 +43,7 @@
     /**
      * This constant represents the name of a property that is set when the message payload is null.
      */
-    private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL");
+    private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName();
     private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
 
     public JMSTextMessage() throws JMSException

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java Thu Feb 15 15:23:48 2007
@@ -107,7 +107,7 @@
         public OneUseChannel(int channelId, VirtualHost virtualHost)
             throws AMQException
         {
-            super(channelId,
+            super(ClusteredProtocolSession.this,channelId,
                   virtualHost.getMessageStore(),
                   virtualHost.getExchangeRegistry());
         }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java Thu Feb 15 15:23:48 2007
@@ -26,9 +26,12 @@
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.log4j.Logger;
 
 public class AMQBindingURL implements BindingURL
 {
+    private static final Logger _logger = Logger.getLogger(AMQBindingURL.class);
+
     String _url;
     AMQShortString _exchangeClass;
     AMQShortString _exchangeName;
@@ -41,7 +44,7 @@
     {
         //format:
         // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-
+        _logger.debug("Parsing URL: " + url);
         _url = url;
         _options = new HashMap<String, String>();
 
@@ -73,17 +76,19 @@
 
             if (exchangeName == null)
             {
-                URLHelper.parseError(-1, "Exchange Name not specified.", _url);
+                throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
             }
             else
             {
                 setExchangeName(exchangeName);
             }
 
+            String queueName;
+
             if (connection.getPath() == null ||
                     connection.getPath().equals(""))
             {
-                URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
+                throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
                         "Destination or Queue requried", _url);
             }
             else
@@ -91,7 +96,7 @@
                 int slash = connection.getPath().indexOf("/", 1);
                 if (slash == -1)
                 {
-                    URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
+                    throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
                             "Destination requried", _url);
                 }
                 else
@@ -99,7 +104,10 @@
                     String path = connection.getPath();
                     setDestinationName(path.substring(1, slash));
 
-                    setQueueName(path.substring(slash + 1));
+                    // We don't set queueName yet as the actual value we use depends on options set
+                    // when we are dealing with durable subscriptions
+
+                    queueName = path.substring(slash + 1);
 
                 }
             }
@@ -108,14 +116,19 @@
 
             processOptions();
 
+            // We can now call setQueueName as the URL is full parsed.
+
+            setQueueName(queueName);
+
             //Fragment is #string (not used)
             //System.out.println(connection.getFragment());
+            _logger.debug("URL Parsed: " + this);            
 
         }
         catch (URISyntaxException uris)
         {
 
-            URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+            throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
 
         }
     }
@@ -125,7 +138,7 @@
         setExchangeClass(new AMQShortString(exchangeClass));
     }
 
-    private void setQueueName(String name)
+    private void setQueueName(String name) throws URLSyntaxException
     {
         setQueueName(new AMQShortString(name));
     }
@@ -155,8 +168,9 @@
         return _exchangeClass;
     }
 
-    public void setExchangeClass(AMQShortString exchangeClass)
+    private void setExchangeClass(AMQShortString exchangeClass)
     {
+
         _exchangeClass = exchangeClass;
     }
 
@@ -165,7 +179,7 @@
         return _exchangeName;
     }
 
-    public void setExchangeName(AMQShortString name)
+    private void setExchangeName(AMQShortString name)
     {
         _exchangeName = name;
 
@@ -180,40 +194,43 @@
         return _destinationName;
     }
 
-    public void setDestinationName(AMQShortString name)
+    private void setDestinationName(AMQShortString name)
     {
         _destinationName = name;
     }
 
     public AMQShortString getQueueName()
     {
+        return _queueName;
+    }
+
+    public void setQueueName(AMQShortString name) throws URLSyntaxException
+    {
         if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
         {
             if (Boolean.parseBoolean(getOption(OPTION_DURABLE)))
             {
                 if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
                 {
-                    return new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
+                    _queueName = new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
                 }
                 else
                 {
-                    return getDestinationName();
+                    throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url);
+
                 }
             }
             else
             {
-                return getDestinationName();
+                _queueName = null;
             }
         }
         else
         {
-            return _queueName;
+            _queueName = name;
         }
-    }
 
-    public void setQueueName(AMQShortString name)
-    {
-        _queueName = name;
+
     }
 
     public String getOption(String key)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java Thu Feb 15 15:23:48 2007
@@ -40,29 +40,17 @@
 
     AMQShortString getExchangeClass();
 
-    void setExchangeClass(AMQShortString name);
-
     AMQShortString getExchangeName();
 
-    void setExchangeName(AMQShortString name);
-
     AMQShortString getDestinationName();
 
-    void setDestinationName(AMQShortString name);
-
     AMQShortString getQueueName();
 
-    void setQueueName(AMQShortString name);
-
     String getOption(String key);
 
-    void setOption(String key, String value);
-
     boolean containsOption(String key);
 
     AMQShortString getRoutingKey();
-
-    void setRoutingKey(AMQShortString key);
 
     String toString();
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java Thu Feb 15 15:23:48 2007
@@ -114,11 +114,11 @@
 
             if (sepIndex >= options.length() || sepIndex == 0)
             {
-                parseError(valueIndex, "Unterminated option", options);
+                throw parseError(valueIndex, "Unterminated option", options);
             }
             else
             {
-                parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
+                throw parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
                         options.charAt(sepIndex) + "'", options);
             }
         }
@@ -136,14 +136,14 @@
     }
 
 
-    public static void parseError(int index, String error, String url) throws URLSyntaxException
+    public static URLSyntaxException parseError(int index, String error, String url)
     {
-        parseError(index, 1, error, url);
+        return parseError(index, 1, error, url);
     }
 
-    public static void parseError(int index, int length, String error, String url) throws URLSyntaxException
+    public static URLSyntaxException parseError(int index, int length, String error, String url)
     {
-        throw new URLSyntaxException(url, error, index, length);
+        return new URLSyntaxException(url, error, index, length);
     }
 
     public static String printOptions(HashMap<String, String> options)

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Thu Feb 15 15:23:48 2007
@@ -58,7 +58,7 @@
         assertTrue(channelCount == 1);
         AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()),
                                                             false, new AMQShortString("test"), true, _virtualHost);
-        AMQChannel channel = new AMQChannel(2, _messageStore, null);
+        AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore, null);
         channel.setDefaultQueue(queue);
         _protocolSession.addChannel(channel);
         channelCount = _mbean.channels().size();
@@ -69,7 +69,7 @@
         assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
 
         // check APIs
-        AMQChannel channel3 = new AMQChannel(3, _messageStore, null);
+        AMQChannel channel3 = new AMQChannel(_protocolSession,3, _messageStore, null);
         channel3.setLocalTransactional();
         _protocolSession.addChannel(channel3);
         _mbean.rollbackTransactions(2);
@@ -89,14 +89,14 @@
         }
 
         // check if closing of session works
-        _protocolSession.addChannel(new AMQChannel(5, _messageStore, null));
+        _protocolSession.addChannel(new AMQChannel(_protocolSession,5, _messageStore, null));
         _mbean.closeConnection();
         try
         {
             channelCount = _mbean.channels().size();
             assertTrue(channelCount == 0);
             // session is now closed so adding another channel should throw an exception
-            _protocolSession.addChannel(new AMQChannel(6, _messageStore, null));
+            _protocolSession.addChannel(new AMQChannel(_protocolSession,6, _messageStore, null));
             fail();
         }
         catch(AMQException ex)
@@ -109,13 +109,14 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        _channel = new AMQChannel(1, _messageStore, null);
+
         IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
         _virtualHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
         _queueRegistry = _virtualHost.getQueueRegistry();
         _exchangeRegistry = _virtualHost.getExchangeRegistry();
         _mockIOSession = new MockIoSession();
         _protocolSession = new AMQMinaProtocolSession(_mockIOSession, appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true));
+        _channel = new AMQChannel(_protocolSession,1, _messageStore, null);
         _protocolSession.addChannel(_channel);
         _mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject();
     }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Thu Feb 15 15:23:48 2007
@@ -78,8 +78,9 @@
         assertFalse(mgr.hasActiveSubscribers());
         assertTrue(_queueMBean.getActiveConsumerCount() == 0);
 
-        _channel = new AMQChannel(1, _messageStore, null);
+
         _protocolSession = new MockProtocolSession(_messageStore);
+        _channel = new AMQChannel(_protocolSession, 1, _messageStore, null);
         _protocolSession.addChannel(_channel);
 
         _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false);

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Thu Feb 15 15:23:48 2007
@@ -75,8 +75,9 @@
     {
         super.setUp();
         _messageStore = new TestableMemoryMessageStore();
-        _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/);
         _protocolSession = new MockProtocolSession(_messageStore);
+        _channel = new AMQChannel(_protocolSession,5, _messageStore, null/*dont need exchange registry*/);
+
         _protocolSession.addChannel(_channel);
         _subscriptionManager = new SubscriptionSet();
         _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager);

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Thu Feb 15 15:23:48 2007
@@ -97,12 +97,12 @@
 
     }
 
-    public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
+    public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException
     {
         return null;
     }
 
-    public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
+    public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
     {
         return null;
     }