You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/10/17 11:11:09 UTC

svn commit: r464867 - in /incubator/qpid/trunk/qpid/java: broker/src/org/apache/qpid/server/ broker/src/org/apache/qpid/server/handler/ broker/src/org/apache/qpid/server/queue/ broker/src/org/apache/qpid/server/txn/ broker/src/org/apache/qpid/server/ut...

Author: gsim
Date: Tue Oct 17 02:11:07 2006
New Revision: 464867

URL: http://svn.apache.org/viewvc?view=rev&rev=464867
Log:
Fixes to transaction handling.

- This line, and those below, will be ignored--

M    java/broker/test/src/org/apache/qpid/server/UnitTests.java
M    java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
A    java/broker/test/src/org/apache/qpid/server/txn
AM   java/broker/test/src/org/apache/qpid/server/txn/UnitTests.java
AM   java/broker/test/src/org/apache/qpid/server/txn/TxnBufferTest.java
M    java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
AM   java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java
M    java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
M    java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
AM   java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java
M    java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
M    java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
M    java/broker/src/org/apache/qpid/server/queue/Subscription.java
M    java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
M    java/broker/src/org/apache/qpid/server/AMQChannel.java
M    java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
M    java/broker/src/org/apache/qpid/server/txn/TxnOp.java
AM   java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
M    java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
AM   java/client/test/src/org/apache/qpid/transacted/UnitTests.java
M    java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java

Added:
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/TxnBufferTest.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/UnitTests.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java   (with props)
    incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/UnitTests.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
    incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
    incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java Tue Oct 17 02:11:07 2006
@@ -19,6 +19,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -26,16 +27,20 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.TxnBuffer;
 import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.util.OrderedMapHelper;
 
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -94,6 +99,8 @@
 
     private final TxnBuffer _txnBuffer;
 
+    private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
+
     public static class UnacknowledgedMessage
     {
         public final AMQMessage message;
@@ -202,18 +209,53 @@
             //don't create a transaction unless needed
             if(_currentMessage.isPersistent())
             {
-                _txnBuffer.setPersistentMessageRecevied();
+                _txnBuffer.containsPersistentChanges();
             }
 
-            //don't route this until commit
-            _txnBuffer.enlist(new Publish(_currentMessage));
-            _currentMessage = null;
+            //A publication will result in the enlisting of several
+            //TxnOps. The first is an op that will store the message.
+            //Following that (and ordering is important), an op will
+            //be added for every queue onto which the message is
+            //enqueued. Finally a cleanup op will be added to decrement
+            //the reference associated with the routing.
+            Store storeOp = new Store(_currentMessage);
+            _txnBuffer.enlist(storeOp);
+            _currentMessage.setTxnBuffer(_txnBuffer);
+            try
+            {
+                _exchanges.routeContent(_currentMessage);
+                _txnBuffer.enlist(new Cleanup(_currentMessage));
+            }
+            catch(RequiredDeliveryException e)
+            {
+                //Can only be due to the mandatory flag, as no attempt
+                //has yet been made to deliver the message. The
+                //message will thus not have been delivered to any
+                //queue so we can return the message (without killing
+                //the transaction) and for efficiency remove the store
+                //operation from the buffer.
+                _txnBuffer.cancel(storeOp);
+                throw e;
+            }
+            finally
+            {
+                _currentMessage = null;
+            }
         }
         else
         {
-            _exchanges.routeContent(_currentMessage);
-            _currentMessage.decrementReference();
-            _currentMessage = null;
+            try
+            {
+                _exchanges.routeContent(_currentMessage);
+                //following check implements the functionality
+                //required by the 'immediate' flag:
+                _currentMessage.checkDeliveredToConsumer();
+            }
+            finally
+            {    
+                _currentMessage.decrementReference();
+                _currentMessage = null;
+            }
         }
     }
 
@@ -402,8 +444,14 @@
     {
         if (_transactional)
         {
-            //don't handle this until commit
-            _txnBuffer.enlist(new Ack(deliveryTag, multiple));
+            try
+            {
+                _txnBuffer.enlist(new Ack(getUnackedMessageFinder().getValues(deliveryTag, multiple)));
+            }
+            catch(NoSuchElementException e)
+            {
+                throw new AMQException("Received ack for unrecognised delivery tag: " + deliveryTag);
+            }
         }
         else
         {
@@ -540,6 +588,7 @@
     public void commit() throws AMQException
     {
         _txnBuffer.commit();
+        //TODO: may need to return 'immediate' messages at this point
     }
 
     public void rollback() throws AMQException
@@ -578,20 +627,74 @@
         return _defaultQueue;
     }
 
+    public void processReturns(AMQProtocolSession session)
+    {
+        for(AMQDataBlock block : _returns)
+        {
+            session.writeFrame(block);
+        }
+        _returns.clear();
+    }
+
+    private OrderedMapHelper<Long, UnacknowledgedMessage> getUnackedMessageFinder()
+    { 
+        return new OrderedMapHelper<Long, UnacknowledgedMessage>(_unacknowledgedMessageMap, _unacknowledgedMessageMapLock, 0L);
+    }
+
+
     private class Ack implements TxnOp
     {
-        private final long _msgId;
-        private final boolean _multi;
+        private final Map<Long, UnacknowledgedMessage> _unacked;
+
+        Ack(Map<Long, UnacknowledgedMessage> unacked) throws AMQException
+        {
+            _unacked = unacked;
+
+            //if any of the messages in unacked are persistent the txn
+            //buffer must be marked as persistent:
+            for(UnacknowledgedMessage msg : _unacked.values())
+            {
+                if(msg.message.isPersistent())
+                {
+                    _txnBuffer.containsPersistentChanges();
+                    break;
+                }
+            }
+        }
+
+        public void prepare() throws AMQException
+        {
+            //make persistent changes, i.e. dequeue and decrementReference
+            for(UnacknowledgedMessage msg : _unacked.values())
+            {
+                msg.discard();
+            }
+        }
 
-        Ack(long msgId, boolean multi)
+        public void undoPrepare()
         {
-            _msgId = msgId;
-            _multi = multi;
+            //decrementReference is annoyingly untransactional (due to
+            //in memory counter) so if we failed in prepare for full
+            //txn, this op will have to compensate by fixing the count
+            //in memory (persistent changes will be rolled back by store) 
+            for(UnacknowledgedMessage msg : _unacked.values())
+            {
+
+                msg.message.incrementReference();
+            }            
         }
 
-        public void commit() throws AMQException
+        public void commit()
         {
-            handleAcknowledgement(_msgId, _multi);
+            //remove the unacked messages from the channels map
+            synchronized(_unacknowledgedMessageMapLock)
+            {
+                for(long tag : _unacked.keySet())
+                {
+                    _unacknowledgedMessageMap.remove(tag);
+                }
+            }
+            
         }
 
         public void rollback()
@@ -599,39 +702,75 @@
         }
     }
 
-    //TODO:
-    //implement a scheme whereby messages can be stored on disk
-    //until commit, then reloaded...
-    private class Publish implements TxnOp
+    private class Store implements TxnOp
     {
+        //just use this to do a store of the message during the
+        //prepare phase. Any enqueueing etc is done by TxnOps enlisted
+        //by the queues themselves.
         private final AMQMessage _msg;
 
-        Publish(AMQMessage msg)
+        Store(AMQMessage msg)
         {
             _msg = msg;
         }
 
-        public boolean isPersistent() throws AMQException
+        public void prepare() throws AMQException
         {
-            return _msg.isPersistent();
+            _msg.storeMessage();
+            //the routers reference can now be released
+            _msg.decrementReference();
         }
 
-        public void commit() throws AMQException
+        public void undoPrepare()
+        {
+        }
+
+        public void commit()
         {
-            _exchanges.routeContent(_msg);
-            _msg.decrementReference();
         }
 
         public void rollback()
         {
+        }
+    }
+
+    private class Cleanup implements TxnOp
+    {
+        private final AMQMessage _msg;
+
+        Cleanup(AMQMessage msg)
+        {
+            _msg = msg;
+        }
+
+        public void prepare() throws AMQException
+        {
+            //the routers reference can now be released
+            _msg.decrementReference();
             try
             {
-                _msg.decrementReference();
+                _msg.checkDeliveredToConsumer();
             }
-            catch (AMQException e)
+            catch(NoConsumersException e)
             {
-                _log.error("Error rolling back a publish request: " + e, e);
+                //TODO: store this for delivery after the commit-ok
+                _returns.add(e.getReturnMessage(_channelId));
             }
+        }
+
+        public void undoPrepare()
+        {
+            //don't need to do anything here, if the store's txn failed
+            //when processing prepare then the message was not stored
+            //or enqueued on any queues and can be discarded
+        }
+
+        public void commit()
+        {
+        }
+
+        public void rollback()
+        {
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java Tue Oct 17 02:11:07 2006
@@ -20,6 +20,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.TxCommitBody;
 import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -46,8 +47,10 @@
     {
 
         try{
-            protocolSession.getChannel(evt.getChannelId()).commit();
-            protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));        
+            AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+            channel.commit();
+            protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+            channel.processReturns(protocolSession);            
         }catch(AMQException e){
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java Tue Oct 17 02:11:07 2006
@@ -19,8 +19,9 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TxnBuffer;
 import org.apache.qpid.AMQException;
 
 import java.util.ArrayList;
@@ -61,17 +62,44 @@
      */
     private transient final MessageStore _store;
 
+    /**
+     * For non transactional publishes, a message can be stored as
+     * soon as it is complete. For transactional messages it doesnt
+     * need to be stored until the transaction is committed.
+     */
+    private boolean _storeWhenComplete;
+
+    /**
+     * TxnBuffer for transactionally published messages
+     */
+    private TxnBuffer _txnBuffer;
+
+    /**
+     * Flag to indicate whether message has been delivered to a
+     * consumer. Used in implementing return functionality for
+     * messages published with the 'immediate' flag.
+     */
+    private boolean _deliveredToConsumer;
+
+
     public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
     {
+        this(messageStore, publishBody, true);
+    }
+
+    public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete)
+    {
         _messageId = messageStore.getNewMessageId();
         _publishBody = publishBody;
         _store = messageStore;
         _contentBodies = new LinkedList<ContentBody>();
+        _storeWhenComplete = storeWhenComplete;
     }
 
     public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
                       ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
             throws AMQException
+    
     {
         _publishBody = publishBody;
         _contentHeaderBody = contentHeaderBody;
@@ -93,7 +121,7 @@
         this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies);
     }
 
-    private void storeMessage() throws AMQException
+    public void storeMessage() throws AMQException
     {
         if (isPersistent())
         {
@@ -149,7 +177,7 @@
     public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
     {
         _contentHeaderBody = contentHeaderBody;
-        if (isAllContentReceived())
+        if (_storeWhenComplete && isAllContentReceived())
         {
             storeMessage();
         }
@@ -169,7 +197,7 @@
     {
         _contentBodies.add(contentBody);
         _bodyLengthReceived += contentBody.getSize();
-        if (isAllContentReceived())
+        if (_storeWhenComplete && isAllContentReceived())
         {
             storeMessage();
         }
@@ -293,5 +321,36 @@
         //todo remove literal values to a constant file such as AMQConstants in common
         return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
                 &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+    }
+
+    public void setTxnBuffer(TxnBuffer buffer)
+    {
+        _txnBuffer = buffer;
+    }
+
+    public TxnBuffer getTxnBuffer()
+    {
+        return _txnBuffer;
+    }
+
+    /**
+     * Called to enforce the 'immediate' flag. 
+     * @throws NoConsumersException if the message is marked for
+     * immediate delivery but has not been marked as delivered to a
+     * consumer
+     */
+    public void checkDeliveredToConsumer() throws NoConsumersException{
+        if(isImmediate() && !_deliveredToConsumer)
+        {
+            throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
+        }
+    }
+
+    /**
+     * Called when this message is delivered to a consumer. (used to
+     * implement the 'immediate' flag functionality).
+     */
+    public void setDeliveredToConsumer(){
+        _deliveredToConsumer = true;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java Tue Oct 17 02:11:07 2006
@@ -27,6 +27,8 @@
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.txn.TxnOp;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -610,11 +612,55 @@
 
     public void deliver(AMQMessage msg) throws AMQException
     {
+        TxnBuffer buffer = msg.getTxnBuffer();
+        if(buffer == null)
+        {
+            //non-transactional
+            record(msg);
+            process(msg);
+        }
+        else
+        {
+            buffer.enlist(new Deliver(msg));
+        }
+    }
+
+    private void record(AMQMessage msg) throws AMQException
+    {
         msg.enqueue(this);
+        msg.incrementReference();
+    }
+
+    private void process(AMQMessage msg) throws FailedDequeueException
+    {
         _deliveryMgr.deliver(getName(), msg);
-        updateReceivedMessageCount();
+        try
+        {
+            msg.checkDeliveredToConsumer();
+            updateReceivedMessageCount();
+        }
+        catch(NoConsumersException e)
+        {
+            // as this message will be returned, it should be removed
+            // from the queue:
+            dequeue(msg);
+        }
+
     }
 
+    void dequeue(AMQMessage msg) throws FailedDequeueException
+    {
+        try
+        {
+            msg.decrementReference();                
+            msg.dequeue(this);
+        }
+        catch(AMQException e)
+        {
+            throw new FailedDequeueException(_name, e);
+        }
+    }
+    
     public void deliverAsync()
     {
         _deliveryMgr.processAsync(_asyncDelivery);
@@ -664,4 +710,49 @@
             _logger.debug(MessageFormat.format(msg, args));
         }
     }
+
+    private class Deliver implements TxnOp
+    {
+        private final AMQMessage _msg;
+
+        Deliver(AMQMessage msg)
+        {
+            _msg = msg;
+        }
+
+        public void prepare() throws AMQException
+        {
+            record(_msg);
+        }
+
+        public void undoPrepare()
+        {
+        }
+
+        public void commit()
+        {
+            try
+            {
+                process(_msg);
+            }
+            catch(FailedDequeueException e)
+            {
+                //TODO: is there anything else we can do here? I think not...
+                _logger.error("Error during commit of a queue delivery: " + e, e);
+            }
+        }
+
+        public void rollback()
+        {
+            try
+            {
+                _msg.decrementReference();
+            }
+            catch (AMQException e)
+            {
+                _logger.error("Error rolling back a queue delivery: " + e, e);
+            }
+        }
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java Tue Oct 17 02:11:07 2006
@@ -72,8 +72,16 @@
     {
         if (_queueing)
         {
-            _messages.offer(msg);
-            return true;
+            if(msg.isImmediate())
+            {
+                //can't enqueue messages for whom immediate delivery is required
+                return false;
+            }
+            else
+            {
+                _messages.offer(msg);
+                return true;
+            }
         }
         else
         {
@@ -147,14 +155,7 @@
                 //We don't synchronize access to subscribers so need to re-check
                 if (next != null)
                 {
-                    try
-                    {
-                        next.send(poll(), _queue);
-                    }
-                    catch (AMQException e)
-                    {
-                        _log.error("Unable to deliver message: " + e, e);
-                    }
+                    next.send(poll(), _queue);
                 }
                 else
                 {
@@ -162,6 +163,10 @@
                 }
             }
         }
+        catch (FailedDequeueException e)
+        {
+            _log.error("Unable to deliver message as dequeue failed: " + e, e);
+        }
         finally
         {
             _processing.set(false);
@@ -211,10 +216,10 @@
      * @param msg  the message to deliver
      * @throws NoConsumersException if there are no active subscribers to deliver
      *                              the message to
+     * @throws FailedDequeueException if the message could not be dequeued
      */
-    void deliver(String name, AMQMessage msg) throws AMQException
+    void deliver(String name, AMQMessage msg) throws FailedDequeueException
     {
-        msg.incrementReference();
         // first check whether we are queueing, and enqueue if we are
         if (!enqueue(msg))
         {
@@ -222,11 +227,7 @@
             Subscription s =  _subscriptions.nextSubscriber(msg);
             if (s == null)
             {
-                if (msg.isImmediate())
-                {
-                    throw msg.getNoConsumersException(name);
-                }
-                else
+                if (!msg.isImmediate())
                 {
                     // no subscribers yet so enter 'queueing' mode and queue this message
                     startQueueing(msg);
@@ -235,19 +236,7 @@
             else
             {
                 s.send(msg, _queue);
-            }
-        }
-
-        else
-        {
-            if (msg.isImmediate())
-            {
-                //todo check with spec to see if enqueing for immediate client delivery is ok.
-                Subscription s = _subscriptions.nextSubscriber(msg);
-                if (s == null)
-                {
-                    throw msg.getNoConsumersException(name);
-                }
+                msg.setDeliveredToConsumer();
             }
         }
     }

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java?view=auto&rev=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java Tue Oct 17 02:11:07 2006
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * Signals that the dequeue of a message from a queue failed
+ */
+public class FailedDequeueException extends AMQException
+{
+    public FailedDequeueException(String queue)
+    {
+        super("Failed to dequeue message from " + queue);
+    }
+
+    public FailedDequeueException(String queue, AMQException e)
+    {
+        super("Failed to dequeue message from " + queue, e);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/FailedDequeueException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java Tue Oct 17 02:11:07 2006
@@ -40,6 +40,13 @@
         super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
     }
 
+    public NoConsumersException(BasicPublishBody publishBody,
+                                ContentHeaderBody contentHeaderBody,
+                                List<ContentBody> contentBodies)
+    {
+        super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+    }
+
     public int getReplyCode()
     {
         return AMQConstant.NO_CONSUMERS.getCode();

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java Tue Oct 17 02:11:07 2006
@@ -21,7 +21,7 @@
 
 public interface Subscription
 {
-    void send(AMQMessage msg, AMQQueue queue) throws AMQException;
+    void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
 
     boolean isSuspended();
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Oct 17 02:11:07 2006
@@ -122,10 +122,23 @@
      * @param queue
      * @throws AMQException
      */
-    public void send(AMQMessage msg, AMQQueue queue) throws AMQException
+    public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
     {
         if (msg != null)
         {
+            // if we do not need to wait for client acknowledgements
+            // we can decrement the reference count immediately. 
+            
+            // By doing this _before_ the send we ensure that it
+            // doesn't get sent if it can't be dequeued, preventing
+            // duplicate delivery on recovery.
+
+            // The send may of course still fail, in which case, as
+            // the message is unacked, it will be lost.
+            if (!_acks)
+            {
+                queue.dequeue(msg);
+            }
             synchronized(channel)
             {
                 long deliveryTag = channel.getNextDeliveryTag();
@@ -139,13 +152,6 @@
                 AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
 
                 protocolSession.writeFrame(frame);
-            }
-            // if we do not need to wait for client acknowledgements we can decrement
-            // the reference count immediately
-            if (!_acks)
-            {
-                msg.decrementReference();
-                msg.dequeue(queue);
             }
         }
         else

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java Tue Oct 17 02:11:07 2006
@@ -24,9 +24,13 @@
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Holds a list of TxnOp instance representing transactional
+ * operations. 
+ */
 public class TxnBuffer
 {
-    private boolean _persistentMessageRecevied = false;
+    private boolean _containsPersistentChanges = false;
     private final MessageStore _store;
     private final List<TxnOp> _ops = new ArrayList<TxnOp>();
     private static final Logger _log = Logger.getLogger(TxnBuffer.class);
@@ -36,45 +40,64 @@
         _store = store;
     }
 
-    public void setPersistentMessageRecevied()
+    public void containsPersistentChanges()
     {
-        _persistentMessageRecevied = true;
+        _containsPersistentChanges = true;
     }
 
     public void commit() throws AMQException
     {
-        if (_persistentMessageRecevied)
+        if (_containsPersistentChanges)
         {
             _log.debug("Begin Transaction.");
             _store.beginTran();
-        }
-        boolean failed = true;
-        try
-        {
-            for (TxnOp op : _ops)
+            if(prepare())
             {
-                op.commit();
+                _log.debug("Transaction Succeeded");
+                _store.commitTran();
+                for (TxnOp op : _ops)
+                {
+                    op.commit();
+                }
             }
-            _ops.clear();
-            failed = false;
-        }
-        finally
-        {
-            if (_persistentMessageRecevied)
+            else
             {
-                if (failed)
+                _log.debug("Transaction Failed");
+                _store.abortTran();
+            }
+        }else{
+            if(prepare())
+            {
+                for (TxnOp op : _ops)
                 {
-                    _log.debug("Transaction Failed");
-                    _store.abortTran();
+                    op.commit();
                 }
-                else
+            }            
+        }
+        _ops.clear();
+    }
+
+    private boolean prepare() 
+    {        
+        for (int i = 0; i < _ops.size(); i++)
+        {
+            TxnOp op = _ops.get(i);
+            try
+            {
+                op.prepare();
+            }
+            catch(Exception e)
+            {
+                //compensate previously prepared ops
+                for(int j = 0; j < i; j++)
                 {
-                    _log.debug("Transaction Succeeded");
-                    _store.commitTran();
-                }
+                    _ops.get(j).undoPrepare();
+                }    
+                return false;
             }
         }
-    }
+        return true;
+    }   
 
     public void rollback() throws AMQException
     {
@@ -88,5 +111,10 @@
     public void enlist(TxnOp op)
     {
         _ops.add(op);
+    }
+
+    public void cancel(TxnOp op)
+    {
+        _ops.remove(op);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/txn/TxnOp.java Tue Oct 17 02:11:07 2006
@@ -19,8 +19,33 @@
 
 import org.apache.qpid.AMQException;
 
+/**
+ * This provides the abstraction of an individual operation within a
+ * transaction. It is used by the TxnBuffer class.
+ */
 public interface TxnOp
 {
-    public void commit() throws AMQException;
+    /**
+     * Do the part of the operation that updates persistent state 
+     */
+    public void prepare() throws AMQException;
+    /**
+     * Complete the operation started by prepare. Can now update in
+     * memory state or make netork transfers.
+     */
+    public void commit();
+    /**
+     * This is not the same as rollback. Unfortunately the use of an
+     * in memory reference count as a locking mechanism and a test for
+     * whether a message should be deleted means that as things are,
+     * handling an acknowledgement unavoidably alters both memory and
+     * persistent state on prepare. This is needed to 'compensate' or
+     * undo the in-memory change if the peristent update of later ops
+     * fails.
+     */
+    public void undoPrepare();
+    /**
+     * Rolls back the operation.
+     */
     public void rollback();
 }

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java?view=auto&rev=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java Tue Oct 17 02:11:07 2006
@@ -0,0 +1,116 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Utility class used by AMQChannel to retrieve unacknowledged
+ * messages. Made generic to avoid exposing the inner class in
+ * AMQChannel. Put in this package to keep ot out the way.
+ */
+public class OrderedMapHelper<K, V>
+{
+    private final Map<K, V> _map;
+    private final Object _lock;
+    private final K _wildcard;
+
+    public OrderedMapHelper(Map<K, V> map, Object lock, K wildcard)
+    {
+        _map = map;
+        _lock = lock;
+        _wildcard = wildcard;
+    }
+
+    /**
+     * Assumes the map passed in is ordered.  Returns a copy of the
+     * map containing an individual key-value pair or a list of
+     * key-values upto and including the one matching the given
+     * key. If multiple == true and the key == the wildcard specified
+     * on construction, then all the values in the map will be
+     * returned.
+     */
+    public Map<K, V> getValues(K key, boolean multiple) throws NoSuchElementException
+    {
+        if (multiple)
+        {
+            if(key == _wildcard)
+            {
+                synchronized(_lock)
+                {
+                    return new LinkedHashMap<K, V>(_map);
+                }
+            }
+            else
+            {
+                return getValues(key);
+            }
+        }
+        else
+        {
+            Map<K, V> values = new LinkedHashMap<K, V>();
+            values.put(key, getValue(key));
+            return values;
+        }
+    }
+
+    private V getValue(K key) throws NoSuchElementException
+    {
+        V value;
+        synchronized(_lock)
+        {
+            value = _map.get(key);
+        }
+
+        if(value == null)
+        {
+            throw new NoSuchElementException();
+        }
+        else
+        {
+            return value;
+        }
+    }
+
+    private Map<K, V> getValues(K key) throws NoSuchElementException
+    {
+        Map<K, V> values = new LinkedHashMap<K, V>();
+        synchronized(_lock)
+        {
+            if (!_map.containsKey(key))
+            {
+                throw new NoSuchElementException();
+            }
+            
+            for(Map.Entry<K, V> entry : _map.entrySet())
+            {
+                values.put(entry.getKey(), entry.getValue());
+                if (entry.getKey() == key)
+                {
+                    break;
+                }                        
+            }
+        }
+        return values;
+    }
+
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java Tue Oct 17 02:11:07 2006
@@ -28,6 +28,7 @@
         org.apache.qpid.server.protocol.UnitTests.class,
         org.apache.qpid.server.queue.UnitTests.class,
         org.apache.qpid.server.store.UnitTests.class,
+        org.apache.qpid.server.txn.UnitTests.class,
         org.apache.qpid.server.util.UnitTests.class
         })
 public class UnitTests

Modified: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java Tue Oct 17 02:11:07 2006
@@ -138,7 +138,9 @@
     @Test (expected=NoConsumersException.class)
     public void noConsumers() throws AMQException
     {
-        _mgr.deliver("Me", message(true));
+        AMQMessage msg = message(true);
+        _mgr.deliver("Me", msg);
+        msg.checkDeliveredToConsumer();        
     }
 
     @Test (expected=NoConsumersException.class)
@@ -147,7 +149,9 @@
         TestSubscription s = new TestSubscription("A");
         _subscriptions.addSubscriber(s);
         s.setSuspended(true);
-        _mgr.deliver("Me", message(true));
+        AMQMessage msg = message(true);
+        _mgr.deliver("Me", msg);
+        msg.checkDeliveredToConsumer();
     }
 
     public static junit.framework.Test suite()

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/TxnBufferTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/TxnBufferTest.java?view=auto&rev=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/TxnBufferTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/TxnBufferTest.java Tue Oct 17 02:11:07 2006
@@ -0,0 +1,305 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+
+import java.util.LinkedList;
+
+public class TxnBufferTest
+{
+    private final LinkedList<MockOp> ops = new LinkedList<MockOp>();  
+
+    @Before
+    public void setup() throws Exception
+    {
+    }
+
+    @Test
+    public void commit() throws AMQException
+    {
+        MockStore store = new MockStore();
+
+        TxnBuffer buffer = new TxnBuffer(store);
+        buffer.enlist(new MockOp().expectPrepare().expectCommit());
+        //check relative ordering
+        MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit();
+        buffer.enlist(op);
+        buffer.enlist(op);
+        buffer.enlist(new MockOp().expectPrepare().expectCommit());
+
+        buffer.commit();
+
+        validateOps();
+        store.validate();
+    }
+
+    @Test
+    public void rollback() throws AMQException
+    {
+        MockStore store = new MockStore();
+
+        TxnBuffer buffer = new TxnBuffer(store);
+        buffer.enlist(new MockOp().expectRollback());
+        buffer.enlist(new MockOp().expectRollback());
+        buffer.enlist(new MockOp().expectRollback());
+
+        buffer.rollback();
+
+        validateOps();
+        store.validate();
+    }
+
+    @Test
+    public void commitWithFailureDuringPrepare() throws AMQException
+    {
+        MockStore store = new MockStore();
+        store.expectBegin().expectAbort();
+
+        TxnBuffer buffer = new TxnBuffer(store);
+        buffer.containsPersistentChanges();
+        buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
+        buffer.enlist(new TxnTester(store));
+        buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
+        buffer.enlist(new FailedPrepare());
+        buffer.enlist(new MockOp());
+
+        buffer.commit();        
+        validateOps();
+        store.validate();
+    }
+
+    @Test
+    public void commitWithPersistance() throws AMQException
+    {
+        MockStore store = new MockStore();
+        store.expectBegin().expectCommit();
+
+        TxnBuffer buffer = new TxnBuffer(store);
+        buffer.enlist(new MockOp().expectPrepare().expectCommit());
+        buffer.enlist(new MockOp().expectPrepare().expectCommit());
+        buffer.enlist(new MockOp().expectPrepare().expectCommit());
+        buffer.enlist(new TxnTester(store));
+        buffer.containsPersistentChanges();
+
+        buffer.commit();
+        validateOps();
+        store.validate();
+    }
+
+    private void validateOps()
+    {
+        for(MockOp op : ops)
+        {
+            op.validate();
+        }
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(TxnBufferTest.class);
+    }
+
+    class MockOp implements TxnOp
+    {        
+        final Object PREPARE = "PREPARE";
+        final Object COMMIT = "COMMIT";
+        final Object UNDO_PREPARE = "UNDO_PREPARE";
+        final Object ROLLBACK = "ROLLBACK";
+
+        private final LinkedList expected = new LinkedList();
+
+        MockOp()
+        {
+            ops.add(this);
+        }
+
+        public void prepare()
+        {
+            assertEquals(expected.removeLast(), PREPARE);
+        }
+
+        public void commit()
+        {
+            assertEquals(expected.removeLast(), COMMIT);
+        }
+
+        public void undoPrepare()
+        {
+            assertEquals(expected.removeLast(), UNDO_PREPARE);
+        }
+
+        public void rollback()
+        {
+            assertEquals(expected.removeLast(), ROLLBACK);
+        }
+
+        private MockOp expect(Object optype)
+        {
+            expected.addFirst(optype);
+            return this;
+        }
+
+        MockOp expectPrepare()
+        {
+            return expect(PREPARE);
+        }
+
+        MockOp expectCommit()
+        {
+            return expect(COMMIT);
+        }
+
+        MockOp expectUndoPrepare()
+        {
+            return expect(UNDO_PREPARE);
+        }
+
+        MockOp expectRollback()
+        {
+            return expect(ROLLBACK);
+        }
+
+        void validate()
+        {
+            assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
+        }
+
+        void clear()
+        {
+            expected.clear();
+        }
+    }
+
+    class MockStore extends TestableMemoryMessageStore
+    {
+        final Object BEGIN = "BEGIN";
+        final Object ABORT = "ABORT";
+        final Object COMMIT = "COMMIT";
+
+        private final LinkedList expected = new LinkedList();
+        private boolean inTran;
+
+        public void beginTran() throws AMQException
+        {
+            assertEquals(expected.removeLast(), BEGIN);
+            inTran = true;
+        }
+        
+        public void commitTran() throws AMQException
+        {
+            assertEquals(expected.removeLast(), COMMIT);
+            inTran = false;
+        }
+        
+        public void abortTran() throws AMQException
+        {
+            assertEquals(expected.removeLast(), ABORT);
+            inTran = false;
+        }
+
+        public boolean inTran()
+        {
+            return inTran;
+        }
+
+        private MockStore expect(Object optype)
+        {
+            expected.addFirst(optype);
+            return this;
+        }
+
+        MockStore expectBegin()
+        {
+            return expect(BEGIN);
+        }
+
+        MockStore expectCommit()
+        {
+            return expect(COMMIT);
+        }
+
+        MockStore expectAbort()
+        {
+            return expect(ABORT);
+        }
+
+        void clear()
+        {
+            expected.clear();
+        }
+
+        void validate()
+        {
+            assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
+        }
+    }
+
+    class NullOp implements TxnOp
+    {        
+        public void prepare() throws AMQException
+        {
+        }
+        public void commit()
+        {
+        }
+        public void undoPrepare()
+        {
+        }
+        public void rollback()
+        {
+        }
+    }
+
+    class FailedPrepare extends NullOp
+    {        
+        public void prepare() throws AMQException
+        {
+            throw new AMQException("Fail!");
+        }
+    }
+
+    class TxnTester extends NullOp
+    {        
+        private final MessageStore store;
+
+        TxnTester(MessageStore store)
+        {
+            this.store = store;
+        }
+
+        public void prepare() throws AMQException
+        {
+            assertTrue("Expected prepare to be performed under txn", store.inTran());
+        }
+
+        public void commit()
+        {
+            assertTrue("Expected commit not to be performed under txn", !store.inTran());
+        }
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/TxnBufferTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/UnitTests.java?view=auto&rev=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/UnitTests.java Tue Oct 17 02:11:07 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        TxnBufferTest.class
+})
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/txn/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java?view=auto&rev=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java Tue Oct 17 02:11:07 2006
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.apache.qpid.AMQException;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class OrderedMapHelperTest
+{
+    private final Object lock = new Object();
+    private final Map<Integer, String> map = new LinkedHashMap<Integer, String>();
+    private final OrderedMapHelper<Integer, String> helper = new OrderedMapHelper<Integer, String>(map, lock, 0);
+
+    @Before
+    public void setup() throws Exception
+    {
+        map.put(1, "One");
+        map.put(2, "Two");
+        map.put(5, "Five");
+        map.put(8, "Eight");
+        map.put(10, "Ten");
+    }
+
+    @Test
+    public void specific()
+    {
+        Map<Integer, String> slice = helper.getValues(5, false);
+        assertEquals(1, slice.size());
+        assertTrue(slice.containsKey(5));
+        assertTrue(slice.containsValue("Five"));
+        assertEquals("Five", slice.get(5));
+    }
+
+    @Test
+    public void multiple()
+    {
+        Map<Integer, String> slice = helper.getValues(5, true);
+        assertEquals(3, slice.size());
+
+        assertTrue(slice.containsKey(1));
+        assertTrue(slice.containsKey(2));
+        assertTrue(slice.containsKey(5));
+
+        assertEquals("One", slice.get(1));
+        assertEquals("Two", slice.get(2));
+        assertEquals("Five", slice.get(5));
+    }
+
+    @Test
+    public void all()
+    {
+        Map<Integer, String> slice = helper.getValues(0/*the 'wildcard'*/, true);
+        assertEquals(5, slice.size());
+
+        assertTrue(slice.containsKey(1));
+        assertTrue(slice.containsKey(2));
+        assertTrue(slice.containsKey(5));
+        assertTrue(slice.containsKey(8));
+        assertTrue(slice.containsKey(10));
+
+        assertEquals("One", slice.get(1));
+        assertEquals("Two", slice.get(2));
+        assertEquals("Five", slice.get(5));
+        assertEquals("Eight", slice.get(8));
+        assertEquals("Ten", slice.get(10));
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(OrderedMapHelperTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java Tue Oct 17 02:11:07 2006
@@ -22,7 +22,7 @@
 import org.junit.runners.Suite;
 
 @RunWith(Suite.class)
-@Suite.SuiteClasses({LoggingProxyTest.class})
+@Suite.SuiteClasses({LoggingProxyTest.class, OrderedMapHelperTest.class})
 public class UnitTests
 {
     public static junit.framework.Test suite()

Modified: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java?view=diff&rev=464867&r1=464866&r2=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java (original)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java Tue Oct 17 02:11:07 2006
@@ -32,7 +32,8 @@
         org.apache.qpid.client.message.UnitTests.class,
         org.apache.qpid.forwardall.UnitTests.class,
         org.apache.qpid.destinationurl.UnitTests.class,
-        org.apache.qpid.jndi.referenceabletest.UnitTests.class
+        org.apache.qpid.jndi.referenceabletest.UnitTests.class,
+        org.apache.qpid.transacted.UnitTests.class
         })
 public class AllClientUnitTests
 {

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/UnitTests.java?view=auto&rev=464867
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/UnitTests.java Tue Oct 17 02:11:07 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({TransactedTest.class})
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/transacted/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native