You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/02/20 15:50:02 UTC

svn commit: r746260 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/ack/ main/java/org/apache/qpid/server/handler/ main/java/org/apache/qpid/server/queue/ main/java/org/apache/qpid/server/store/...

Author: ritchiem
Date: Fri Feb 20 14:50:01 2009
New Revision: 746260

URL: http://svn.apache.org/viewvc?rev=746260&view=rev
Log:
QPID-1632 - Removal of reference counting and update to tests, TxAckTest was reduced in size as reference counting is now not modified until the transaction completes.
Replaced MessageReferenceCoutingTest with PersistentMessageTest and further tests wil be created when DerbyDBMessageStore is also updated.

Added:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
      - copied, changed from r746259, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
Removed:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Feb 20 14:50:01 2009
@@ -499,7 +499,7 @@
             }
             else
             {
-                unacked.discard(_storeContext);
+                unacked.dequeueAndDelete(_storeContext);
             }
         }
 
@@ -555,7 +555,7 @@
                 _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
                           + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
 
-                unacked.discard(_storeContext);
+                unacked.dequeueAndDelete(_storeContext);
             }
         }
         else
@@ -712,7 +712,7 @@
                     {
                         try
                         {
-                            message.discard(_storeContext);
+                            message.dequeueAndDelete(_storeContext);
                             message.setQueueDeleted(true);
 
                         }
@@ -831,9 +831,7 @@
             {
                 AMQMessage message = bouncedMessage.getAMQMessage();
                 _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
-                                                                 new AMQShortString(bouncedMessage.getMessage()));
-
-                message.decrementReference(_storeContext);
+                                                                 new AMQShortString(bouncedMessage.getMessage()));                
             }
 
             _returnMessages.clear();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java Fri Feb 20 14:50:01 2009
@@ -82,13 +82,13 @@
                 }
                 else
                 {
-                    queueEntry.discard(_storeContext);
+                    queueEntry.dequeueAndDelete(_storeContext);
                     _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry);
                 }
             }
             else
             {
-                queueEntry.discard(_storeContext);
+                queueEntry.dequeueAndDelete(_storeContext);
                 _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry);
             }
         }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Fri Feb 20 14:50:01 2009
@@ -56,14 +56,10 @@
 
     public void setMessage(final AMQMessage payload)
     {
-
-        // Increment the reference as this message is in the routing phase
-        // and so will have the ref decremented as routing fails.
         // we need to keep this message around so we can return it in the
-        // handler. So increment here.
-        payload.incrementReference(1);
+        // handler.
+        // Messages are all kept in memory now. Only queues can push messages out of memory.
         _amqMessage = payload;
-
     }
 
     public AMQMessage getAMQMessage()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Fri Feb 20 14:50:01 2009
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.ack;
 
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
@@ -116,22 +115,20 @@
         //make persistent changes, i.e. dequeue and decrementReference
         for (QueueEntry msg : _unacked.values())
         {
-            //Message has been ack so discard it. This will dequeue and decrement the reference.
-            msg.discard(storeContext);
-
+            //Message has been ack so dequeueAndDelete it.
+            // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+            // from the transaciton log
+            msg.dequeueAndDelete(storeContext);
         }
     }
 
     public void undoPrepare()
     {
-        //decrementReference is annoyingly untransactional (due to
-        //in memory counter) so if we failed in prepare for full
-        //txn, this op will have to compensate by fixing the count
-        //in memory (persistent changes will be rolled back by store)
-        for (QueueEntry msg : _unacked.values())
-        {
-            msg.getMessage().incrementReference(1);
-        }
+        //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the
+        // transactionLog. Only when the transaction succesfully completes will it perform any
+        // update of the internal transactionLog reference counting and any resulting message data deletion.
+        // The success or failure of the data deletion is not important to this transaction only that the ack has been
+        // successfully recorded.
     }
 
     public void commit(StoreContext storeContext)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Fri Feb 20 14:50:01 2009
@@ -174,8 +174,10 @@
                                            " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
                 }
 
-                //Message has been ack so discard it. This will dequeue and decrement the reference.
-                unacked.getValue().discard(storeContext);
+                //Message has been ack so dequeueAndDelete it.
+                // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+                // from the transaciton log                
+                unacked.getValue().dequeueAndDelete(storeContext);
 
                 it.remove();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Fri Feb 20 14:50:01 2009
@@ -65,38 +65,38 @@
 
         long deliveryTag = body.getDeliveryTag();
 
-        QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+        QueueEntry queueEntry = channel.getUnacknowledgedMessageMap().get(deliveryTag);
 
-        if (message == null)
+        if (queueEntry == null)
         {
             _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
 //            throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
         }
         else
         {
-            if (message.isQueueDeleted())
+            if (queueEntry.isQueueDeleted())
             {
                 _logger.warn("Message's Queue as already been purged, unable to Reject. " +
                              "Dropping message should use Dead Letter Queue");
-                message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
-                if(message != null)
+                queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+                if(queueEntry != null)
                 {
-                    message.discard(channel.getStoreContext());
+                    queueEntry.dequeueAndDelete(channel.getStoreContext());
                 }
                 //sendtoDeadLetterQueue(msg)
                 return;
             }
 
-            if (!message.getMessage().isReferenced())
+            if (queueEntry.isDeleted())
             {
-                _logger.warn("Message as already been purged, unable to Reject.");
+                _logger.warn("QueueEntry as already been deleted, unable to Reject.");
                 return;
             }
 
 
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+                _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() +
                               ": Requeue:" + body.getRequeue() +
                               //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());
@@ -105,7 +105,7 @@
             // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
             //if (!evt.getMethod().resend)
             {
-                message.reject();
+                queueEntry.reject();
             }
 
             if (body.getRequeue())
@@ -115,7 +115,7 @@
             else
             {
                 _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
-                 message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+                 queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
                 //sendtoDeadLetterQueue(AMQMessage message)
 //                message.queue = channel.getDefaultDeadLetterQueue();
 //                channel.requeue(deliveryTag);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Feb 20 14:50:01 2009
@@ -114,17 +114,8 @@
             throws AMQException;
 
 
-    void removeMessage(StoreContext storeContext) throws AMQException;
 
     String toString();
 
     String debugIdentity();
-
-    // Reference counting methods
-
-    void decrementReference(StoreContext storeContext) throws MessageCleanupException;
-
-    boolean incrementReference(int queueCount);
-
-    boolean isReferenced();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb 20 14:50:01 2009
@@ -81,25 +81,18 @@
 
     boolean isDeleted();
 
-
     int delete() throws AMQException;
 
-
     QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
 
     void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
 
     void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
 
-
-
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
 
-    
-
     void addQueueDeleteTask(final Task task);
 
-
     List<QueueEntry> getMessagesOnTheQueue();
 
     List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Fri Feb 20 14:50:01 2009
@@ -156,8 +156,7 @@
             _logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues);
         }
 
-        try
-        {
+
             // first we allow the handle to know that the message has been fully received. This is useful if it is
             // maintaining any calculated values based on content chunks
             _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
@@ -196,7 +195,6 @@
             {
                 int offset;
                 final int queueCount = _destinationQueues.size();
-                _message.incrementReference(queueCount);
                 if(queueCount == 1)
                 {
                     offset = 0;
@@ -222,12 +220,8 @@
             }
 
             return _message;
-        }
-        finally
-        {
-            // Remove refence for routing process . Reference count should now == delivered queue count
-            if(_message != null) _message.decrementReference(_txnContext.getStoreContext());
-        }
+      
+
 
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java Fri Feb 20 14:50:01 2009
@@ -58,13 +58,6 @@
     }
 
     @Override
-    public void removeMessage(StoreContext storeContext) throws AMQException
-    {
-        _log.info("PAMQM : removing message:" + _messageId);
-        _transactionLog.removeMessage(storeContext, _messageId);
-    }
-
-    @Override
     public boolean isPersistent()
     {
         return true;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Feb 20 14:50:01 2009
@@ -49,7 +49,6 @@
         public abstract State getState();
     }
 
-
     public final class AvailableState extends EntryState
     {
 
@@ -59,7 +58,6 @@
         }
     }
 
-
     public final class DequeuedState extends EntryState
     {
 
@@ -69,7 +67,6 @@
         }
     }
 
-
     public final class DeletedState extends EntryState
     {
 
@@ -88,7 +85,6 @@
         }
     }
 
-
     public final class NonSubscriptionAcquiredState extends EntryState
     {
         public State getState()
@@ -106,7 +102,6 @@
             _subscription = subscription;
         }
 
-
         public State getState()
         {
             return State.ACQUIRED;
@@ -118,16 +113,12 @@
         }
     }
 
-
     final static EntryState AVAILABLE_STATE = new AvailableState();
     final static EntryState DELETED_STATE = new DeletedState();
     final static EntryState DEQUEUED_STATE = new DequeuedState();
     final static EntryState EXPIRED_STATE = new ExpiredState();
     final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
 
-
-
-
     AMQQueue getQueue();
 
     AMQMessage getMessage();
@@ -141,9 +132,11 @@
     boolean isAcquired();
 
     boolean acquire();
+
     boolean acquire(Subscription sub);
 
     boolean delete();
+
     boolean isDeleted();
 
     boolean acquiredBySubscription();
@@ -170,12 +163,21 @@
 
     void dequeue(final StoreContext storeContext) throws FailedDequeueException;
 
-    void dispose(final StoreContext storeContext) throws MessageCleanupException;
-
-    void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
+    /**
+     * Message has been ack so dequeueAndDelete it.
+     * If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+     * from the transaciton log
+     *
+     * @param storeContext the transactional Context in which to perform the deletion
+     *
+     * @throws FailedDequeueException
+     * @throws MessageCleanupException
+     */
+    void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException;
 
     boolean isQueueDeleted();
 
     void addStateChangeListener(StateChangeListener listener);
+
     boolean removeStateChangeListener(StateChangeListener listener);
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Feb 20 14:50:01 2009
@@ -282,13 +282,12 @@
             }
 
             getQueue().dequeue(storeContext, this);
-            if(_stateChangeListeners != null)
+
+            if (_stateChangeListeners != null)
             {
-                notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+                notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED);
             }
-
         }
-
     }
 
     private void notifyStateChange(final State oldState, final State newState)
@@ -299,29 +298,15 @@
         }
     }
 
-    public void dispose(final StoreContext storeContext) throws MessageCleanupException
-    {
-        _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state);
-        if(delete())
-        {
-            _log.info("QEI delete message:" + getMessage().getMessageId());
-            getMessage().decrementReference(storeContext);
-        }
-        else
-        {
-            _log.info("QEI delete state wrong:" + getMessage().getMessageId());
-        }
-    }
-
-    public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+    public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
     {
-        //if the queue is null then the message is waiting to be acked, but has been removed.
+        //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d;
         if (getQueue() != null)
         {
             dequeue(storeContext);
         }
 
-        dispose(storeContext);
+        delete();
     }
 
     public boolean isQueueDeleted()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Feb 20 14:50:01 2009
@@ -408,8 +408,11 @@
 
         if (entry.immediateAndNotDelivered())
         {
-            dequeue(storeContext, entry);
-            entry.dispose(storeContext);
+            //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content
+            // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks
+            // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses. 
+            entry.acquire();
+            entry.dequeueAndDelete(storeContext);
         }
         else if (!(entry.isAcquired() || entry.isDeleted()))
         {
@@ -562,6 +565,12 @@
 
     }
 
+    /**
+     * Only call from queue Entry
+     * @param storeContext
+     * @param entry
+     * @throws FailedDequeueException
+     */
     public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
     {
         decrementQueueCount();
@@ -578,7 +587,6 @@
             {
                 _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
             }
-            //entry.dispose(storeContext);
 
         }
         catch (MessageCleanupException e)
@@ -814,11 +822,18 @@
 
     }
 
+
     public void moveMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
                                            String queueName,
                                            StoreContext storeContext)
     {
+        // The move is a two step process. First the messages are moved in the _transactionLog.
+        // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the
+        // existing queue.
+        // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery
+        // this is done as the message is recieved.
+        // So The final step is to enqueue the messages on the new queue.
 
         AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
         TransactionLog transactionLog = getVirtualHost().getTransactionLog();
@@ -844,7 +859,7 @@
         {
             transactionLog.beginTran(storeContext);
 
-            // Move the messages in on the transaction log.
+            // Move the messages in the transaction log.
             for (QueueEntry entry : entries)
             {
                 AMQMessage message = entry.getMessage();
@@ -853,7 +868,7 @@
                 {
                     transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
                 }
-                // dequeue does not decrement the refence count
+                // dequeue will remove the messages from the queue
                 entry.dequeue(storeContext);
             }
 
@@ -882,10 +897,10 @@
 
         try
         {
+            // Add messages to new queue
             for (QueueEntry entry : entries)
             {
                 toQueue.enqueue(storeContext, entry.getMessage());
-
             }
         }
         catch (MessageCleanupException e)
@@ -918,7 +933,7 @@
                 {
                     if (!entry.isDeleted())
                     {
-                        return entry.getMessage().incrementReference(1);
+                        return true;
                     }
                 }
 
@@ -940,7 +955,7 @@
             {
                 AMQMessage message = entry.getMessage();
 
-                if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+                if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable())
                 {
                     transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
                 }
@@ -973,7 +988,7 @@
         {
             for (QueueEntry entry : entries)
             {
-                if (entry.getMessage().isReferenced())
+                if (!entry.isDeleted())
                 {
                     toQueue.enqueue(storeContext, entry.getMessage());
                 }
@@ -1008,7 +1023,7 @@
                     && !node.isDeleted()
                     && node.acquire())
                 {
-                    node.discard(storeContext);
+                    node.dequeueAndDelete(storeContext);
                 }
 
             }
@@ -1032,7 +1047,7 @@
             QueueEntry node = queueListIterator.getNode();
             if (!node.isDeleted() && node.acquire())
             {
-                node.discard(storeContext);
+                node.dequeueAndDelete(storeContext);
                 noDeletes = false;
             }
 
@@ -1050,7 +1065,7 @@
             QueueEntry node = queueListIterator.getNode();
             if (!node.isDeleted() && node.acquire())
             {
-                node.discard(storeContext);
+                node.dequeueAndDelete(storeContext);
                 count++;
             }
 
@@ -1315,8 +1330,9 @@
             {
                 if (node.acquire())
                 {
+                    // creating a new final store context per message seems wasteful.
                     final StoreContext reapingStoreContext = new StoreContext();
-                    node.discard(reapingStoreContext);
+                    node.dequeueAndDelete(reapingStoreContext);
                 }
             }
             QueueEntry newNode = _entries.next(node);
@@ -1431,7 +1447,7 @@
             QueueEntry node = queueListIterator.getNode();
             if (!node.isDeleted() && node.expired() && node.acquire())
             {
-                node.discard(storeContext);
+                node.dequeueAndDelete(storeContext);
             } 
             else 
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java Fri Feb 20 14:50:01 2009
@@ -172,11 +172,6 @@
         _expiration = expiration;
     }
 
-    public boolean isReferenced()
-    {
-        return _referenceCount.get() > 0;
-    }
-
     public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
     {
         return new BodyFrameIterator(protocolSession, channel);
@@ -197,76 +192,6 @@
         return _messageId;
     }
 
-    /* Threadsafe. Increment the reference count on the message. */
-    public boolean incrementReference(int count)
-    {
-        if (_referenceCount.addAndGet(count) <= 1)
-        {
-            int newcount = _referenceCount.addAndGet(-count);
-            _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount);
-            return false;
-        }
-        else
-        {
-            _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1("
-                       + _referenceCount.get() + ")");
-            return true;
-        }
-
-    }
-
-    /**
-     * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
-     * message store.
-     *
-     * @param storeContext
-     *
-     * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
-     *                                 failed
-     */
-    public void decrementReference(StoreContext storeContext) throws MessageCleanupException
-    {
-
-        int count = _referenceCount.decrementAndGet();
-
-        _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count);
-
-        // note that the operation of decrementing the reference count and then removing the message does not
-        // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
-        // the message has been passed to all queues. i.e. we are
-        // not relying on the all the increments having taken place before the delivery manager decrements.
-        if (count == 0)
-        {
-            // set the reference count way below 0 so that we can detect that the message has been deleted
-            // this is to guard against the message being spontaneously recreated (from the mgmt console)
-            // by copying from other queues at the same time as it is being removed.
-            _referenceCount.set(Integer.MIN_VALUE / 2);
-
-            try
-            {
-                _log.debug("Reference Count hit 0, removing message");
-                // must check if the handle is null since there may be cases where we decide to throw away a message
-                // and the handle has not yet been constructed
-                // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
-                removeMessage(storeContext);
-            }
-            catch (AMQException e)
-            {
-                // to maintain consistency, we revert the count
-                incrementReference(1);
-                throw new MessageCleanupException(getMessageId(), e);
-            }
-        }
-        else
-        {
-            if (count < 0)
-            {
-                throw new MessageCleanupException("Reference count for message id " + debugIdentity()
-                                                  + " has gone below 0.");
-            }
-        }
-    }
-
     /**
      * Called selectors to determin if the message has already been sent
      *
@@ -435,11 +360,6 @@
         return _arrivalTime;
     }
 
-    public void removeMessage(StoreContext storeContext) throws AMQException
-    {
-        //no-op
-    }
-
     public String toString()
     {
         // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Fri Feb 20 14:50:01 2009
@@ -1357,7 +1357,8 @@
 
                 if(message != null)
                 {
-                    message.incrementReference(1);
+                    //todo must enqueue message to build reference table
+//                    message.incrementReference(1);
                 }
                 else
                 {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Feb 20 14:50:01 2009
@@ -30,24 +30,28 @@
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-/** A simple message store that stores the messages in a threadsafe structure in memory.
+/**
+ * A simple message store that stores the messages in a threadsafe structure in memory.
  *
  * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog
  *
  * This class really should have no storage unless we want to do inMemory Recovery.
- *
  */
 public class MemoryMessageStore implements TransactionLog, RoutingTable
 {
@@ -63,6 +67,7 @@
 
     private final AtomicLong _messageId = new AtomicLong(1);
     private AtomicBoolean _closed = new AtomicBoolean(false);
+    protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>();
 
     public void configure()
     {
@@ -112,6 +117,7 @@
         }
         _metaDataMap.remove(messageId);
         _contentBodyMap.remove(messageId);
+        _messageEnqueueMap.remove(messageId);
     }
 
     public void createExchange(Exchange exchange) throws AMQException
@@ -134,7 +140,6 @@
 
     }
 
-
     public void createQueue(AMQQueue queue) throws AMQException
     {
         // Not requred to do anything
@@ -152,12 +157,39 @@
 
     public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
-        // Not required to do anything
+        synchronized (_messageEnqueueMap)
+        {
+            List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
+            if (queues == null)
+            {
+                queues = new LinkedList<AMQQueue>();
+                _messageEnqueueMap.put(messageId, queues);
+            }
+
+            queues.add(queue);
+        }
     }
 
     public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
-        // Not required to do anything
+        synchronized (_messageEnqueueMap)
+        {
+            List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
+            if (queues == null || !queues.contains(queue))
+            {
+                throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName()
+                                           + " but it is not enqueued on that queue.");
+            }
+            else
+            {
+                queues.remove(queue);
+                if (queues.isEmpty())
+                {
+                    removeMessage(context,messageId);
+                }
+            }
+        }
+
     }
 
     public void beginTran(StoreContext context) throws AMQException
@@ -238,7 +270,7 @@
     }
 
     private void checkNotClosed() throws MessageStoreClosedException
-     {
+    {
         if (_closed.get())
         {
             throw new MessageStoreClosedException();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Fri Feb 20 14:50:01 2009
@@ -144,7 +144,8 @@
 
             StoreContext storeContext = getChannel().getStoreContext();
             try
-            { // if we do not need to wait for client acknowledgements
+            {
+                // if we do not need to wait for client acknowledgements
                 // we can decrement the reference count immediately.
 
                 // By doing this _before_ the send we ensure that it
@@ -153,7 +154,7 @@
 
                 // The send may of course still fail, in which case, as
                 // the message is unacked, it will be lost.
-                entry.dequeue(storeContext);
+                entry.dequeueAndDelete(storeContext);
 
 
                 synchronized (getChannel())
@@ -163,7 +164,6 @@
                     sendToClient(entry, deliveryTag);
 
                 }
-                entry.dispose(storeContext);
             }
             finally
             {
@@ -316,7 +316,7 @@
             _autoClose = false;
         }
 
-
+        _logger.info(debugIdentity()+" Created subscription:");
     }
 
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Fri Feb 20 14:50:01 2009
@@ -92,20 +92,12 @@
         public void process() throws AMQException
         {
 
-            _message.incrementReference(1);
-            try
-            {
                 QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
 
                 if(entry.immediateAndNotDelivered())
                 {
                     getReturnMessages().add(new NoConsumersException(_message));
                 }
-            }
-            finally
-            {
-                _message.decrementReference(getStoreContext());
-            }
         }
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Fri Feb 20 14:50:01 2009
@@ -123,18 +123,18 @@
                           unacknowledgedMessageMap.size());
                 unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
                 {
-                    public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+                    public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException
                     {
                         if (debug)
                         {
-                            _log.debug("Discarding message: " + message.getMessage().getMessageId());
+                            _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
                         }
-                        if(message.getMessage().isPersistent())
+                        if(queueEntry.getMessage().isPersistent())
                         {
                             beginTranIfNecessary();
                         }
-                        //Message has been ack so discard it. This will dequeue and decrement the reference.
-                        message.discard(_storeContext);
+                        //Message has been ack so dequeueAndDelete it.
+                        queueEntry.dequeueAndDelete(_storeContext);
 
                         return false;
                     }
@@ -157,10 +157,10 @@
         }
         else
         {
-            QueueEntry msg;
-            msg = unacknowledgedMessageMap.get(deliveryTag);
+            QueueEntry queueEntry;
+            queueEntry = unacknowledgedMessageMap.get(deliveryTag);
 
-            if (msg == null)
+            if (queueEntry == null)
             {
                 _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
                           _channel.getChannelId());
@@ -170,15 +170,17 @@
 
             if (debug)
             {
-                _log.debug("Discarding message: " + msg.getMessage().getMessageId());
+                _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
             }
-            if(msg.getMessage().isPersistent())
+            if(queueEntry.getMessage().isPersistent())
             {
                 beginTranIfNecessary();
             }
 
-            //Message has been ack so discard it. This will dequeue and decrement the reference.
-            msg.discard(_storeContext);
+            //Message has been ack so dequeueAndDelete it.
+            // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+            // from the transaciton log
+            queueEntry.dequeueAndDelete(_storeContext);
 
             unacknowledgedMessageMap.remove(deliveryTag);
 
@@ -186,7 +188,7 @@
             if (debug)
             {
                 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
-                           msg.getMessage().getMessageId());
+                           queueEntry.getMessage().getMessageId());
             }
         }
         if(_inTran)

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Fri Feb 20 14:50:01 2009
@@ -28,6 +28,7 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.queue.AMQMessage;
@@ -37,9 +38,10 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.TransientAMQMessage;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 
@@ -81,20 +83,6 @@
     	combined.stop();
     }
 
-    public void testPrepare() throws AMQException
-    {
-        individual.prepare();
-        multiple.prepare();
-        combined.prepare();
-    }
-
-    public void testUndoPrepare() throws AMQException
-    {
-        individual.undoPrepare();
-        multiple.undoPrepare();
-        combined.undoPrepare();
-    }
-
     public void testCommit() throws AMQException
     {
         individual.commit();
@@ -115,12 +103,13 @@
         private final List<Long> _unacked;
         private StoreContext _storeContext = new StoreContext();
 		private AMQQueue _queue;
+        private TransactionLog _transactionLog = new TestableMemoryMessageStore();
 
         private static final int MESSAGE_SIZE=100;
 
         Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
         {
-            TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
+            TransactionalContext txnContext = new NonTransactionalContext(_transactionLog,
                                                                           _storeContext, null,
                                                                           new LinkedList<RequiredDeliveryException>()
             );
@@ -138,12 +127,15 @@
 
                 MessagePublishInfo info = new MessagePublishInfoImpl();
 
-                AMQMessage message = new TestMessage(deliveryTag, info);
+                AMQMessage message = new TestMessage(deliveryTag, info, (TestTransactionLog) _transactionLog);
 
                 ContentHeaderBody header = new ContentHeaderBody();
                 header.bodySize = MESSAGE_SIZE;
                 message.setPublishAndContentHeaderBody(_storeContext, info, header);
 
+
+
+
                 _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
             }
             _acked = acked;
@@ -165,25 +157,6 @@
             }
         }
 
-        void prepare() throws AMQException
-        {
-            _op.consolidate();
-            _op.prepare(_storeContext);
-
-            assertCount(_acked, -1);
-            assertCount(_unacked, 0);
-
-        }
-
-        void undoPrepare()
-        {
-            _op.consolidate();
-            _op.undoPrepare();
-
-            assertCount(_acked, 1);
-            assertCount(_unacked, 0);
-        }
-
         void commit()
         {
             _op.consolidate();
@@ -232,30 +205,22 @@
     private class TestMessage extends TransientAMQMessage
     {
         private final long _tag;
-        private int _count;
+        private TestTransactionLog _transactionLog;
 
-        TestMessage(long tag, MessagePublishInfo publishBody)
+        public TestMessage(long tag, MessagePublishInfo publishBody, TestTransactionLog transactionLog)
                 throws AMQException
         {
             super(createMessage( publishBody));
             _tag = tag;
+            _transactionLog = transactionLog;
         }
 
 
-        public boolean incrementReference(int count)
-        {
-            _count+=count;
-            return true;
-        }
-
-        public void decrementReference(StoreContext context)
-        {
-            _count--;
-        }
-
         void assertCountEquals(int expected)
         {
-            assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+            List<AMQQueue> list = _transactionLog.getMessageReferenceMap(_messageId);
+            int actual = (list == null ? 0 : list.size());
+            assertEquals("Wrong count for message with tag " + _tag, expected, actual);
         }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Fri Feb 20 14:50:01 2009
@@ -324,7 +324,7 @@
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+                public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Fri Feb 20 14:50:01 2009
@@ -30,14 +30,16 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.flow.LimitlessCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.NullApplicationRegistry;
@@ -57,7 +59,7 @@
 
     private MockProtocolSession _protocolSession;
 
-    private TestMemoryMessageStore _messageStore;
+    private TestableMemoryMessageStore _messageStore;
 
     private StoreContext _storeContext = new StoreContext();
 
@@ -72,14 +74,15 @@
         super.setUp();
         ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
 
-        _messageStore = new TestMemoryMessageStore();
+        VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+        _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog());
         _protocolSession = new MockProtocolSession(_messageStore);
         _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
 
         _protocolSession.addChannel(_channel);
 
-        _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
-                                                    null);
+        _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"),
+                                                    true, vhost, null);
     }
 
     protected void tearDown()
@@ -185,7 +188,7 @@
     /**
      * Tests that in no-ack mode no messages are retained
      */
-    public void testPersistentNoAckMode() throws AMQException
+    public void testPersistentNoAckMode() throws AMQException, InterruptedException
     {
         // false arg means no acks expected
         _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
@@ -194,7 +197,7 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
+        assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0);
         assertTrue(_messageStore.getContentBodyMap().size() == 0);
 
     }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Fri Feb 20 14:50:01 2009
@@ -38,4 +38,9 @@
     {
         super(_defaultList, message);
     }
+
+    public MockQueueEntry(AMQMessage message, SimpleAMQQueue queue)
+    {
+        super(new SimpleQueueEntryList(queue) ,message);
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java Fri Feb 20 14:50:01 2009
@@ -20,18 +20,46 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
 
 public class PersistentMessageTest extends TransientMessageTest
 {
-    private MemoryMessageStore _messageStore;
+    private TestableMemoryMessageStore _messageStore;
 
-    public void setUp()
+    protected SimpleAMQQueue _queue;
+    protected AMQShortString _q1name = new AMQShortString("q1name");
+    protected AMQShortString _owner = new AMQShortString("owner");
+    protected AMQShortString _routingKey = new AMQShortString("routing key");
+    private TransactionalContext _messageDeliveryContext;
+    private static final long MESSAGE_SIZE = 0L;
+    private List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
+
+    public void setUp() throws Exception
     {
-        _messageStore = new MemoryMessageStore();
-        _messageStore.configure();
+        _messageStore = new TestableMemoryMessageStore();
+
         _storeContext = new StoreContext();
+        VirtualHost vhost = new VirtualHost(PersistentMessageTest.class.getName(), _messageStore);
+        _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null);
+        // Create IncomingMessage and nondurable queue
+        _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages);
+
     }
 
     @Override
@@ -47,4 +75,86 @@
         assertTrue(_message.isPersistent());
     }
 
+    /**
+     * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and
+     * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right
+     * contents. TransactionLog = Empty, returnMessages 1 item. 
+     *
+     * @throws Exception
+     */
+    public void testImmediateReturnNotInLog() throws Exception
+    {
+        MessagePublishInfo info = new MessagePublishInfoImpl(null, true, false, null);
+        IncomingMessage msg = createMessage(info);
+
+        // Send persistent message
+        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+        qs.add(_queue);
+
+        // equivalent to amqChannel.routeMessage()
+        msg.enqueue(qs);
+
+        msg.routingComplete(_messageStore);
+
+        // equivalent to amqChannel.deliverCurrentMessageIfComplete
+        msg.deliverToQueues();
+
+        // Check that data has been stored to disk
+        long messageId = msg.getMessageId();
+        checkMessageMetaDataExists(messageId);
+
+        // Check that it was not enqueued
+        List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
+        assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList);
+        checkMessageMetaDataRemoved(messageId);
+
+        assertEquals("Return message count not correct", 1, _returnMessages.size());
+    }
+
+    protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
+    {
+        IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
+                                                  new MockProtocolSession(_messageStore), _messageStore);
+
+        // equivalent to amqChannel.publishContenHeader
+        ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+        contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+        // This message has no bodies
+        contentHeaderBody.bodySize = MESSAGE_SIZE;
+        contentHeaderBody.properties = new BasicContentHeaderProperties();
+        ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+
+        msg.setContentHeaderBody(contentHeaderBody);
+        msg.setExpiration();
+
+        return msg;
+    }
+
+    protected void checkMessageMetaDataExists(long messageId)
+    {
+        try
+        {
+            _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+        }
+        catch (AMQException amqe)
+        {
+            fail("Message MetaData does not exist for message:" + messageId);
+        }
+    }
+
+    protected void checkMessageMetaDataRemoved(long messageId)
+    {
+        try
+        {
+            assertNull("Message MetaData still exists for message:" + messageId,
+                       _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
+            assertNull("Message still has values in the reference map:" + messageId,
+                       _messageStore.getMessageReferenceMap(messageId));
+
+        }
+        catch (AMQException e)
+        {
+            fail("AMQE thrown whilst trying to getMessageMetaData:" + e.getMessage());
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Feb 20 14:50:01 2009
@@ -36,10 +36,12 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestTransactionLog;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -319,7 +321,7 @@
     {
         // Create IncomingMessage and nondurable queue
         NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
-        IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store);
+        IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store);
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.properties = new BasicContentHeaderProperties();
@@ -338,21 +340,22 @@
         _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
 
         // Check that it is enqueued
-        AMQQueue data = _store.getMessages().get(messageId);
+        List<AMQQueue> data = _store.getMessageReferenceMap(messageId);
         assertNotNull(data);
 
         // Dequeue message
-
         ContentHeaderBody header = new ContentHeaderBody();
         header.bodySize = MESSAGE_SIZE;
         AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
         message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
 
-        MockQueueEntry entry = new MockQueueEntry(message);
-        _queue.dequeue(null, entry);
+        MockQueueEntry entry = new MockQueueEntry(message, _queue);
+        entry.getQueueEntryList().add(message);
+        entry.acquire();
+        entry.dequeue(null);
 
         // Check that it is dequeued
-        data = _store.getMessages().get(messageId);
+        data = _store.getMessageReferenceMap(messageId);
         assertNull(data);
     }
 
@@ -381,7 +384,7 @@
 
     public AMQMessage createMessage() throws AMQException
     {
-        AMQMessage message = new TestMessage(info);
+        AMQMessage message = new TestMessage(info, _store);
 
         ContentHeaderBody header = new ContentHeaderBody();
         header.bodySize = MESSAGE_SIZE;
@@ -397,29 +400,21 @@
     public class TestMessage extends TransientAMQMessage
     {
         private final long _tag;
-        private int _count;
+        private TestTransactionLog _transactionLog;
 
-        TestMessage(MessagePublishInfo publishBody)
+        TestMessage(MessagePublishInfo publishBody, TestTransactionLog transactionLog)
                 throws AMQException
         {
             super(SimpleAMQQueueTest.createMessage(publishBody));
             _tag = getMessageId();
+            _transactionLog = transactionLog;
         }
 
-        public boolean incrementReference(int count)
-        {
-            _count+=count;
-            return true;
-        }
-
-        public void decrementReference(StoreContext context)
-        {
-            _count--;
-        }
 
         void assertCountEquals(int expected)
         {
-            assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+            assertEquals("Wrong count for message with tag " + _tag, expected,
+                         _transactionLog.getMessageReferenceMap(_messageId).size());
         }
     }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Fri Feb 20 14:50:01 2009
@@ -34,7 +34,7 @@
  */
 public class TestReferenceCounting extends TestCase
 {
-    private TestMemoryMessageStore _store;
+    private TestableMemoryMessageStore _store;
 
     private StoreContext _storeContext = new StoreContext();
 
@@ -42,7 +42,7 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        _store = new TestMemoryMessageStore();
+        _store = new TestableMemoryMessageStore();
     }
 
     /**
@@ -54,19 +54,9 @@
 
         MessagePublishInfo info = new MessagePublishInfoImpl();
 
-        final long messageId = _store.getNewMessageId();
-
         AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
         message.setPublishAndContentHeaderBody(_storeContext, info, chb);
 
-        message.incrementReference(1);
-
-        // we call routing complete to set up the handle
- //       message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
-
-        assertEquals(1, _store.getMessageMetaDataMap().size());
-        message.decrementReference(_storeContext);
         assertEquals(1, _store.getMessageMetaDataMap().size());
     }
 
@@ -84,16 +74,10 @@
 
         MessagePublishInfo info = new MessagePublishInfoImpl();
 
-        final Long messageId = _store.getNewMessageId();
         final ContentHeaderBody chb = createPersistentContentHeader();
         AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
         message.setPublishAndContentHeaderBody(_storeContext, info, chb);
         
-        message.incrementReference(1);
-
-        assertEquals(1, _store.getMessageMetaDataMap().size());
-        message.incrementReference(1);
-        message.decrementReference(_storeContext);
         assertEquals(1, _store.getMessageMetaDataMap().size());
     }
 

Copied: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java (from r746259, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java&r1=746259&r2=746260&rev=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java Fri Feb 20 14:50:01 2009
@@ -1,5 +1,5 @@
 /*
-*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,24 +18,14 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
-
-public class MockQueueEntry extends QueueEntryImpl
-{
-    static SimpleQueueEntryList _defaultList = new SimpleQueueEntryList(new MockAMQQueue("MockQueueEntry_DefaultQueue"));
+package org.apache.qpid.server.store;
 
-    public MockQueueEntry()
-    {
-        super(_defaultList);
-    }
+import org.apache.qpid.server.queue.AMQQueue;
 
-    public MockQueueEntry(SimpleQueueEntryList queueEntryList, AMQMessage message)
-    {
-        super(queueEntryList, message);
-    }
+import java.util.Map;
+import java.util.List;
 
-    public MockQueueEntry(AMQMessage message)
-    {
-        super(_defaultList, message);
-    }
+public interface TestTransactionLog
+{
+    public List<AMQQueue> getMessageReferenceMap(Long messageID);
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Feb 20 14:50:01 2009
@@ -23,22 +23,28 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Adds some extra methods to the memory message store for testing purposes.
  */
-public class TestableMemoryMessageStore extends MemoryMessageStore
+public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
 {
 
     MemoryMessageStore _mms = null;
-    private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
 
     public TestableMemoryMessageStore(MemoryMessageStore mms)
     {
@@ -47,46 +53,127 @@
 
     public TestableMemoryMessageStore()
     {
-        _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
-        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+        _mms = new MemoryMessageStore();
+        _mms.configure();
     }
 
     public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
     {
-        if (_mms != null)
-        {
-            return _mms._metaDataMap;
-        }
-        else
-        {
-            return _metaDataMap;
-        }
+        return _mms._metaDataMap;
     }
 
     public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
     {
-        if (_mms != null)
-        {
-            return _mms._contentBodyMap;
-        }
-        else
-        {
-            return _contentBodyMap;
-        }
+        return _mms._contentBodyMap;
     }
-    
-    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+
+    public List<AMQQueue> getMessageReferenceMap(Long messageId)
+    {
+        return _mms._messageEnqueueMap.get(messageId);
+    }
+
+    public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+    {
+        _mms.configure(virtualHost,base,config);
+    }
+
+    public void close() throws Exception
+    {
+        _mms.close();
+    }
+
+    public void createExchange(Exchange exchange) throws AMQException
+    {
+        _mms.createExchange(exchange);
+    }
+
+    public void removeExchange(Exchange exchange) throws AMQException
+    {
+        _mms.removeExchange(exchange);
+    }
+
+    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        _mms.bindQueue(exchange,routingKey,queue,args);
+    }
+
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        _mms.unbindQueue(exchange,routingKey,queue,args);
+    }
+
+    public void createQueue(AMQQueue queue) throws AMQException
+    {
+        _mms.createQueue(queue);
+    }
+
+    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+    {
+        _mms.createQueue(queue,arguments);
+    }
+
+    public void removeQueue(AMQQueue queue) throws AMQException
+    {
+        _mms.removeQueue(queue);
+    }
+
+    public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+    {
+        _mms.removeMessage(storeContext, messageId);
+    }
+
+    public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    {
+        _mms.enqueueMessage(context,queue,messageId);
+    }
+
+    public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    {
+        _mms.dequeueMessage(context,queue,messageId);
+    }
+
+    public void beginTran(StoreContext context) throws AMQException
+    {
+        _mms.beginTran(context);
+    }
+
+    public void commitTran(StoreContext context) throws AMQException
+    {
+        _mms.commitTran(context);
+    }
+
+    public void abortTran(StoreContext context) throws AMQException
+    {
+    _mms.abortTran(context);
+    }
+
+    public boolean inTran(StoreContext context)
+    {
+        return _mms.inTran(context);
+    }
+
+    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+    {
+        _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody);
+    }
+
+    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+    {
+        _mms.storeMessageMetaData(context,messageId,messageMetaData);
+    }
+
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
     {
-        getMessages().put(messageId, queue);
+        return _mms.getMessageMetaData(context,messageId);
     }
 
-    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
     {
-        getMessages().remove(messageId);
+        return _mms.getContentBodyChunk(context,messageId,index);
     }
 
-    public HashMap<Long, AMQQueue> getMessages()
+    public boolean isPersistent()
     {
-        return _messages;
+        return _mms.isPersistent();
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java?rev=746260&r1=746259&r2=746260&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java Fri Feb 20 14:50:01 2009
@@ -22,8 +22,8 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.transactionlog.TransactionLog;
 
 import java.util.LinkedList;
@@ -194,7 +194,7 @@
         }
     }
 
-    class MockStore extends TestMemoryMessageStore
+    class MockStore extends TestableMemoryMessageStore
     {
         final Object BEGIN = "BEGIN";
         final Object ABORT = "ABORT";



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org