You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/10/19 12:11:54 UTC

svn commit: r465549 - in /incubator/qpid/trunk/qpid/java: broker/src/org/apache/qpid/server/ broker/src/org/apache/qpid/server/ack/ broker/src/org/apache/qpid/server/queue/ broker/src/org/apache/qpid/server/util/ broker/test/src/org/apache/qpid/server/...

Author: gsim
Date: Thu Oct 19 03:11:47 2006
New Revision: 465549

URL: http://svn.apache.org/viewvc?view=rev&rev=465549
Log:
Further fixes and some extra tests for transactions.


Added:
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/TxAck.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java   (with props)
Removed:
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
    incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java?view=diff&rev=465549&r1=465548&r2=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java Thu Oct 19 03:11:47 2006
@@ -23,6 +23,10 @@
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.exchange.MessageRouter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
@@ -31,7 +35,6 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.TxnBuffer;
 import org.apache.qpid.server.txn.TxnOp;
-import org.apache.qpid.server.util.OrderedMapHelper;
 
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
@@ -93,36 +96,17 @@
 
     private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
 
+    private long _lastDeliveryTag;
+
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
 
     private final MessageRouter _exchanges;
 
     private final TxnBuffer _txnBuffer;
 
-    private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
-
-    public static class UnacknowledgedMessage
-    {
-        public final AMQMessage message;
-        public final String consumerTag;
-        public AMQQueue queue;
+    private TxAck ackOp;
 
-        public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag)
-        {
-            this.queue = queue;
-            this.message = message;
-            this.consumerTag = consumerTag;
-        }
-
-        private void discard() throws AMQException
-        {
-            if (queue != null)
-            {
-                message.dequeue(queue);
-            }
-            message.decrementReference();
-        }
-    }
+    private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
 
     public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
             throws AMQException
@@ -352,7 +336,8 @@
     {
         synchronized(_unacknowledgedMessageMapLock)
         {
-            _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag));
+            _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+            _lastDeliveryTag = deliveryTag;
             checkSuspension();
         }
     }
@@ -444,13 +429,32 @@
     {
         if (_transactional)
         {
-            try
+            //check that the tag exists to give early failure
+            if(!multiple || deliveryTag > 0)
             {
-                _txnBuffer.enlist(new Ack(getUnackedMessageFinder().getValues(deliveryTag, multiple)));
+                checkAck(deliveryTag);
             }
-            catch(NoSuchElementException e)
+            //we use a single txn op for all acks and update this op
+            //as new acks come in. If this is the first ack in the txn
+            //we will need to create and enlist the op.
+            if(ackOp == null)
             {
-                throw new AMQException("Received ack for unrecognised delivery tag: " + deliveryTag);
+                ackOp = new TxAck(new AckMap());
+                _txnBuffer.enlist(ackOp);
+            }
+            //update the op to include this ack request
+            if(multiple && deliveryTag == 0)
+            {
+                synchronized(_unacknowledgedMessageMapLock)
+                {
+                    //if have signalled to ack all, that refers only
+                    //to all at this time
+                    ackOp.update(_lastDeliveryTag, multiple);
+                }
+            }
+            else
+            {
+                ackOp.update(deliveryTag, multiple);
             }
         }
         else
@@ -459,6 +463,17 @@
         }
     }
 
+    private void checkAck(long deliveryTag) throws AMQException
+    {
+        synchronized(_unacknowledgedMessageMapLock)
+        {
+            if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
+            {
+                throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+            }
+        }        
+    }
+
     private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
     {
         if (multiple)
@@ -587,6 +602,16 @@
 
     public void commit() throws AMQException
     {
+        if(ackOp != null)
+        {
+            ackOp.consolidate();
+            if(ackOp.checkPersistent())
+            {
+                _txnBuffer.containsPersistentChanges();                    
+            }
+            ackOp = null;//already enlisted, after commit will reset regardless of outcome
+        }
+        
         _txnBuffer.commit();
         //TODO: may need to return 'immediate' messages at this point
     }
@@ -636,69 +661,22 @@
         _returns.clear();
     }
 
-    private OrderedMapHelper<Long, UnacknowledgedMessage> getUnackedMessageFinder()
-    { 
-        return new OrderedMapHelper<Long, UnacknowledgedMessage>(_unacknowledgedMessageMap, _unacknowledgedMessageMapLock, 0L);
-    }
-
-
-    private class Ack implements TxnOp
+    //we use this wrapper to ensure we are always using the correct
+    //map instance (its not final unfortunately)
+    private class AckMap implements UnacknowledgedMessageMap
     {
-        private final Map<Long, UnacknowledgedMessage> _unacked;
-
-        Ack(Map<Long, UnacknowledgedMessage> unacked) throws AMQException
+        public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
         {
-            _unacked = unacked;
-
-            //if any of the messages in unacked are persistent the txn
-            //buffer must be marked as persistent:
-            for(UnacknowledgedMessage msg : _unacked.values())
-            {
-                if(msg.message.isPersistent())
-                {
-                    _txnBuffer.containsPersistentChanges();
-                    break;
-                }
-            }
+            impl().collect(deliveryTag, multiple, msgs);
         }
-
-        public void prepare() throws AMQException
+        public void remove(List<UnacknowledgedMessage> msgs)
         {
-            //make persistent changes, i.e. dequeue and decrementReference
-            for(UnacknowledgedMessage msg : _unacked.values())
-            {
-                msg.discard();
-            }
-        }
-
-        public void undoPrepare()
-        {
-            //decrementReference is annoyingly untransactional (due to
-            //in memory counter) so if we failed in prepare for full
-            //txn, this op will have to compensate by fixing the count
-            //in memory (persistent changes will be rolled back by store) 
-            for(UnacknowledgedMessage msg : _unacked.values())
-            {
-
-                msg.message.incrementReference();
-            }            
-        }
-
-        public void commit()
-        {
-            //remove the unacked messages from the channels map
-            synchronized(_unacknowledgedMessageMapLock)
-            {
-                for(long tag : _unacked.keySet())
-                {
-                    _unacknowledgedMessageMap.remove(tag);
-                }
-            }
-            
+            impl().remove(msgs);
         }
 
-        public void rollback()
+        private UnacknowledgedMessageMap impl()
         {
+            return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap);
         }
     }
 
@@ -745,17 +723,6 @@
 
         public void prepare() throws AMQException
         {
-            //the routers reference can now be released
-            _msg.decrementReference();
-            try
-            {
-                _msg.checkDeliveredToConsumer();
-            }
-            catch(NoConsumersException e)
-            {
-                //TODO: store this for delivery after the commit-ok
-                _returns.add(e.getReturnMessage(_channelId));
-            }
         }
 
         public void undoPrepare()
@@ -767,6 +734,27 @@
 
         public void commit()
         {
+            //The routers reference can now be released.  This is done
+            //here to ensure that it happens after the queues that
+            //enqueue it have incremented their counts (which as a
+            //memory only operation is done in the commit phase).
+            try
+            {
+                _msg.decrementReference();
+            }
+            catch(AMQException e)
+            {
+                _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
+            }
+            try
+            {
+                _msg.checkDeliveredToConsumer();
+            }
+            catch(NoConsumersException e)
+            {
+                //TODO: store this for delivery after the commit-ok
+                _returns.add(e.getReturnMessage(_channelId));
+            }
         }
 
         public void rollback()

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/TxAck.java?view=auto&rev=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/TxAck.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/TxAck.java Thu Oct 19 03:11:47 2006
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.txn.TxnOp;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A TxnOp implementation for handling accumulated acks
+ */    
+public class TxAck implements TxnOp
+{
+    private final UnacknowledgedMessageMap _map;
+    private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>();
+    private final List<Long> _individual = new LinkedList<Long>();
+    private long _deliveryTag;
+    private boolean _multiple;
+
+    public TxAck(UnacknowledgedMessageMap map)
+    {
+        _map = map;
+    }
+
+    public void update(long deliveryTag, boolean multiple)
+    {
+        if(!multiple)
+        {
+            //have acked a single message that is not part of
+            //the previously acked region so record
+            //individually
+            _individual.add(deliveryTag);//_multiple && !multiple
+        }
+        else if(deliveryTag > _deliveryTag)
+        {
+            //have simply moved the last acked message on a
+            //bit
+            _deliveryTag = deliveryTag;
+            _multiple = true;
+        }
+    }
+
+    public void consolidate()
+    {
+        //lookup all the unacked messages that have been acked in this transaction
+        if(_multiple)
+        {
+            //get all the unacked messages for the accumulated
+            //multiple acks
+            _map.collect(_deliveryTag, true, _unacked);
+        }
+        //get any unacked messages for individual acks outside the
+        //range covered by multiple acks
+        for(long tag : _individual)
+        {
+            if(_deliveryTag < tag)
+            {
+                _map.collect(tag, false, _unacked);                
+            }
+        }
+    }
+
+    public boolean checkPersistent() throws AMQException
+    {     
+        //if any of the messages in unacked are persistent the txn
+        //buffer must be marked as persistent:
+        for(UnacknowledgedMessage msg : _unacked)
+        {
+            if(msg.message.isPersistent())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void prepare() throws AMQException
+    {
+        //make persistent changes, i.e. dequeue and decrementReference
+        for(UnacknowledgedMessage msg : _unacked)
+        {
+            msg.discard();
+        }
+    }
+    
+    public void undoPrepare()
+    {
+        //decrementReference is annoyingly untransactional (due to
+        //in memory counter) so if we failed in prepare for full
+        //txn, this op will have to compensate by fixing the count
+        //in memory (persistent changes will be rolled back by store) 
+        for(UnacknowledgedMessage msg : _unacked)
+        {
+            msg.message.incrementReference();
+        }            
+    }
+
+    public void commit()
+    {
+        //remove the unacked messages from the channels map
+        _map.remove(_unacked);
+    }
+
+    public void rollback()
+    {
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/TxAck.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java?view=auto&rev=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java Thu Oct 19 03:11:47 2006
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class UnacknowledgedMessage
+{
+    public final AMQMessage message;
+    public final String consumerTag;
+    public final long deliveryTag;
+    public AMQQueue queue;
+    
+    public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag)
+    {
+        this.queue = queue;
+        this.message = message;
+        this.consumerTag = consumerTag;
+        this.deliveryTag = deliveryTag;
+    }
+
+    public void discard() throws AMQException
+    {
+        if (queue != null)
+        {
+            message.dequeue(queue);
+        }
+        message.decrementReference();
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?view=auto&rev=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Thu Oct 19 03:11:47 2006
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import java.util.List;
+
+public interface UnacknowledgedMessageMap
+{
+    public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
+    public void remove(List<UnacknowledgedMessage> msgs);
+}
+

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=auto&rev=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Thu Oct 19 03:11:47 2006
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import java.util.List;
+import java.util.Map;
+
+public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
+{
+    private final Object _lock;
+    private Map<Long, UnacknowledgedMessage> _map;
+
+    public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
+    {
+        _lock = lock;
+        _map = map;
+    }
+
+    public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+    {
+        if (multiple)
+        {
+            collect(deliveryTag, msgs);
+        }
+        else
+        {
+            msgs.add(get(deliveryTag));
+        }
+
+    }
+
+    public void remove(List<UnacknowledgedMessage> msgs)
+    {
+        synchronized(_lock)
+        {
+            for(UnacknowledgedMessage msg : msgs)
+            {
+                _map.remove(msg.deliveryTag);
+            }            
+        }
+    }
+
+    private UnacknowledgedMessage get(long key)
+    {
+        synchronized(_lock)
+        {
+            return _map.get(key);
+        }
+    }
+
+    private void collect(long key, List<UnacknowledgedMessage> msgs)
+    {
+        synchronized(_lock)
+        {
+            for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+            {
+                msgs.add(entry.getValue());
+                if (entry.getKey() == key)
+                {
+                    break;
+                }                        
+            }
+        }
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=465549&r1=465548&r2=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java Thu Oct 19 03:11:47 2006
@@ -255,7 +255,7 @@
      * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
      * message store.
      */
-    public void decrementReference() throws AMQException
+    public void decrementReference() throws MessageCleanupException
     {
         // note that the operation of decrementing the reference count and then removing the message does not
         // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
@@ -263,7 +263,16 @@
         // not relying on the all the increments having taken place before the delivery manager decrements.
         if (_referenceCount.decrementAndGet() == 0)
         {
-            _store.removeMessage(_messageId);
+            try
+            {
+                _store.removeMessage(_messageId);
+            }
+            catch(AMQException e)
+            {
+                //to maintain consistency, we revert the count
+                incrementReference();
+                throw new MessageCleanupException(_messageId, e);
+            }
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=465549&r1=465548&r2=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java Thu Oct 19 03:11:47 2006
@@ -707,8 +707,16 @@
     {
         try
         {
-            msg.decrementReference();
             msg.dequeue(this);
+            msg.decrementReference();
+        }
+        catch(MessageCleanupException e)
+        {
+            //Message was dequeued, but could notthen be deleted
+            //though it is no longer referenced. This should be very
+            //rare and can be detected and cleaned up on recovery or
+            //done through some form of manual intervention.
+            _logger.error(e, e);
         }
         catch(AMQException e)
         {
@@ -777,7 +785,8 @@
 
         public void prepare() throws AMQException
         {
-            record(_msg);
+            //do the persistent part of the record()
+            _msg.enqueue(AMQQueue.this);
         }
 
         public void undoPrepare()
@@ -786,6 +795,9 @@
 
         public void commit()
         {
+            //do the memeory part of the record()
+            _msg.incrementReference();
+            //then process the message
             try
             {
                 process(_msg);
@@ -799,14 +811,6 @@
 
         public void rollback()
         {
-            try
-            {
-                _msg.decrementReference();
-            }
-            catch (AMQException e)
-            {
-                _logger.error("Error rolling back a queue delivery: " + e, e);
-            }
         }
     }
 

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java?view=auto&rev=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java Thu Oct 19 03:11:47 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * Signals that the removal of a message once its refcount reached
+ * zero failed.
+ */
+public class MessageCleanupException extends AMQException
+{
+    public MessageCleanupException(long messageId, AMQException e)
+    {
+        super("Failed to cleanup message with id " + messageId, e);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java?view=auto&rev=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java Thu Oct 19 03:11:47 2006
@@ -0,0 +1,186 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class TxAckTest
+{
+    private Scenario individual;
+    private Scenario multiple;
+    private Scenario combined;
+
+    @Before
+    public void setup() throws Exception
+    {   
+        //ack only 5th msg
+        individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
+        individual.update(5, false);
+
+        //ack all up to and including 5th msg
+        multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
+        multiple.update(5, true);
+
+        //leave only 8th and 9th unacked
+        combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
+        combined.update(3, false);
+        combined.update(5, true);
+        combined.update(7, true);
+        combined.update(2, true);//should be ignored
+        combined.update(1, false);//should be ignored
+        combined.update(10, false);
+    }
+
+    @Test
+    public void prepare() throws AMQException
+    {
+        individual.prepare();
+        multiple.prepare();
+        combined.prepare();
+    }
+
+    @Test
+    public void undoPrepare() throws AMQException
+    {
+        individual.undoPrepare();
+        multiple.undoPrepare();
+        combined.undoPrepare();
+    }
+
+    @Test
+    public void commit() throws AMQException
+    {
+        individual.commit();
+        multiple.commit();
+        combined.commit();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(TxAckTest.class);
+    }
+
+    private class Scenario
+    {
+        private final LinkedHashMap<Long, UnacknowledgedMessage> _messages = new LinkedHashMap<Long, UnacknowledgedMessage>();
+        private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages);
+        private final TxAck _op = new TxAck(_map);
+        private final List<Long> _acked;
+        private final List<Long> _unacked;
+
+        Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+        {
+            for(int i = 0; i < messageCount; i++)
+            {
+                long deliveryTag = i + 1;
+                _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag));
+            }
+            _acked = acked;
+            _unacked = unacked;
+        }
+
+        void update(long deliverytag, boolean multiple)
+        {
+            _op.update(deliverytag, multiple);
+        }
+
+        private void assertCount(List<Long> tags, int expected)
+        {
+            for(long tag : tags)
+            {
+                UnacknowledgedMessage u = _messages.get(tag);
+                assertTrue("Message not found for tag " + tag, u != null);
+                ((TestMessage) u.message).assertCountEquals(expected);
+            }
+        }
+
+        void prepare() throws AMQException
+        {
+            _op.consolidate();
+            _op.prepare();
+
+            assertCount(_acked, -1);
+            assertCount(_unacked, 0);
+            
+        }
+        void undoPrepare()
+        {
+            _op.consolidate();
+            _op.undoPrepare();
+
+            assertCount(_acked, 1);
+            assertCount(_unacked, 0);
+        }
+
+        void commit()
+        {
+            _op.consolidate();
+            _op.commit();
+            
+
+            //check acked messages are removed from map
+            HashSet<Long> keys = new HashSet<Long>(_messages.keySet());
+            keys.retainAll(_acked);
+            assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
+            //check unacked messages are still in map
+            keys = new HashSet<Long>(_unacked);
+            keys.removeAll(_messages.keySet());
+            assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
+        }
+    }
+
+    private class TestMessage extends AMQMessage
+    {
+        private final long _tag;
+        private int _count;
+
+        TestMessage(long tag)
+        {
+            super(new TestableMemoryMessageStore(), null);
+            _tag = tag;
+        }
+
+        public void incrementReference()
+        {
+            _count++;
+        }
+
+        public void decrementReference()
+        {
+            _count--;
+        }
+
+        void assertCountEquals(int expected)
+        {
+            assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java?view=auto&rev=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java Thu Oct 19 03:11:47 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        TxAckTest.class
+})
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=465549&r1=465548&r2=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java Thu Oct 19 03:11:47 2006
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -93,15 +94,15 @@
         final int msgCount = 10;
         publishMessages(msgCount);
 
-        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == msgCount);
 
-        Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+        Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
         for (int i = 1; i <= map.size(); i++)
         {
-            Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+            Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
             assertTrue(entry.getKey() == i);
-            AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+            UnacknowledgedMessage unackedMsg = entry.getValue();
             assertTrue(unackedMsg.queue == _queue);
         }
         assertTrue(_messageStore.getMessageMap().size() == msgCount);
@@ -118,7 +119,7 @@
         final int msgCount = 10;
         publishMessages(msgCount);
 
-        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
         assertTrue(_messageStore.getMessageMap().size() == 0);
     }
@@ -135,16 +136,16 @@
         publishMessages(msgCount);
 
         _channel.acknowledgeMessage(5, false);
-        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == msgCount - 1);
 
-        Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+        Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
         int i = 1;
         while (i <= map.size())
         {
-            Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+            Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
             assertTrue(entry.getKey() == i);
-            AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+            UnacknowledgedMessage unackedMsg = entry.getValue();
             assertTrue(unackedMsg.queue == _queue);
             // 5 is the delivery tag of the message that *should* be removed
             if (++i == 5)
@@ -166,16 +167,16 @@
         publishMessages(msgCount);
 
         _channel.acknowledgeMessage(5, true);
-        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 5);
 
-        Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+        Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
         int i = 1;
         while (i <= map.size())
         {
-            Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+            Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
             assertTrue(entry.getKey() == i + 5);
-            AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+            UnacknowledgedMessage unackedMsg = entry.getValue();
             assertTrue(unackedMsg.queue == _queue);
             ++i;
         }
@@ -193,16 +194,16 @@
         publishMessages(msgCount);
 
         _channel.acknowledgeMessage(0, true);
-        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
 
-        Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+        Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
         int i = 1;
         while (i <= map.size())
         {
-            Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+            Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
             assertTrue(entry.getKey() == i + 5);
-            AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+            UnacknowledgedMessage unackedMsg = entry.getValue();
             assertTrue(unackedMsg.queue == _queue);
             ++i;
         }
@@ -221,7 +222,7 @@
         // at this point we should have sent out only 5 messages with a further 5 queued
         // up in the channel which should be suspended
         assertTrue(_subscription.isSuspended());
-        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 5);
         _channel.acknowledgeMessage(5, true);
         assertTrue(!_subscription.isSuspended());

Modified: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java?view=diff&rev=465549&r1=465548&r2=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java Thu Oct 19 03:11:47 2006
@@ -22,7 +22,7 @@
 import org.junit.runners.Suite;
 
 @RunWith(Suite.class)
-@Suite.SuiteClasses({LoggingProxyTest.class, OrderedMapHelperTest.class})
+@Suite.SuiteClasses({LoggingProxyTest.class})
 public class UnitTests
 {
     public static junit.framework.Test suite()

Modified: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java?view=diff&rev=465549&r1=465548&r2=465549
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java Thu Oct 19 03:11:47 2006
@@ -120,6 +120,7 @@
         expect("Z", testConsumer2.receive(1000));
 
         assertTrue(null == testConsumer1.receive(1000));
+        assertTrue(null == testConsumer2.receive(1000));
     }
 
     @Test
@@ -140,6 +141,7 @@
         expect("B", consumer.receive(1000));
         expect("C", consumer.receive(1000));
 
+        assertTrue(null == testConsumer1.receive(1000));
         assertTrue(null == testConsumer2.receive(1000));
     }