You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/03 15:38:11 UTC

svn commit: r761673 - in /qpid/branches/0.5-release/qpid/java: broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/server/routing/ broker/src/main/java/org/apache/qpid/server/store/ broker/src/main/java/org/apache/qp...

Author: ritchiem
Date: Fri Apr  3 13:38:11 2009
New Revision: 761673

URL: http://svn.apache.org/viewvc?rev=761673&view=rev
Log:
QPID-1764 : Add a BaseTransactionLog that takes care of handling persistent message references so that the underlying TransactionLog need not worry about that.
    Updated MemoryMS to use this even to ensure that the code is exercised.
    To ensure that the new BaseTransactionLog was correctly used when used by a TransactionLog. The configure() method now returns an Object(TransactionLog) that is the newly configured TL. Existing tests and code where the original TL reference was used have
been changed to use the output of the configure() call.
    NOTE: the return type should be changed to TransactionLog but until we have completely split the TransactionLog and RoutingTable implementations then this is not possible.

    The implementation also includes a number of items from the Flow To Disk review:
    - The old get* Methods have been removed from the TransactionLog interface.
    - Rollback should now rollback enqueues. (No test provided)
    - StoreContext now has enqueue/dequeue methods that track the messageId/Queue pairing
    - The linked list per message has been reduced to a link list per message that is enqueued on multiple queues. Messages that exist on only one queue have no additional overhead.

    - Optimisation also included to:
    Include message delete in 'dequeue transaction' where the message was only ever enqueued on a single queue.
    All other message deletes are peformed as part of an asynchrounous commit.

    The asynchrounous commit is setup via the StoreContext, which has had some work done to move it towards becomming a Qpid Transaction Object where all operations are performed against rather than going via the TransactionLog.

Merged from r760951 on trunk


Added:
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
Modified:
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
    qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Fri Apr  3 13:38:11 2009
@@ -136,11 +136,7 @@
 
             if(_destinationQueues != null)
             {
-                for (int i = 0; i < _destinationQueues.size(); i++)
-                {
-                    transactionLog.enqueueMessage(_txnContext.getStoreContext(),
-                            _destinationQueues.get(i), getMessageId());
-                }
+                transactionLog.enqueueMessage(_txnContext.getStoreContext(), _destinationQueues, getMessageId());
             }
         }
     }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Apr  3 13:38:11 2009
@@ -849,7 +849,11 @@
             {
                 if (entry.isPersistent() && toQueue.isDurable())
                 {
-                    transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
+                    //FIXME
+                    //fixme
+                    ArrayList list = new ArrayList();
+                    list.add(toQueue);
+                    transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
                 }
                 // dequeue will remove the messages from the queue
                 entry.dequeue(storeContext);
@@ -941,10 +945,15 @@
             {
                 if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable())
                 {
-                    transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
+                    //fixme
+                    //FIXME
+                    ArrayList list = new ArrayList();
+                    list.add(toQueue);
+                    transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
                 }
             }
 
+
             // Commit and flush the move transcations.
             try
             {

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java Fri Apr  3 13:38:11 2009
@@ -42,7 +42,7 @@
      *
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
-    void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
+    Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
 
     /**
      * Called to close and cleanup any resources used by the message store.

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Fri Apr  3 13:38:11 2009
@@ -34,6 +34,7 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
 import org.apache.qpid.server.routing.RoutingTable;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
@@ -41,7 +42,6 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 
@@ -143,7 +143,7 @@
     private State _state = State.INITIAL;
 
 
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
     {
         //Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config
         if (base.equals("store"))
@@ -178,7 +178,9 @@
             recover();
 
             stateTransition(State.RECOVERING, State.STARTED);
+            return new BaseTransactionLog(this);
         }
+        return null;
     }
 
     private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -825,7 +827,18 @@
 
     }
 
-    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+    {
+        for (AMQQueue q : queues)
+        {
+            if (q.isDurable())
+            {
+                enqueueMessage(context,q,messageId);
+            }
+        }
+    }
+
+    void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
         AMQShortString name = queue.getName();
 

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Apr  3 13:38:11 2009
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
@@ -30,17 +29,14 @@
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.routing.RoutingTable;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -67,16 +63,16 @@
 
     private final AtomicLong _messageId = new AtomicLong(1);
     private AtomicBoolean _closed = new AtomicBoolean(false);
-    protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>();
 
-    public void configure()
+    public TransactionLog configure()
     {
         _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
         _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
         _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
+        return new BaseTransactionLog(this);
     }
 
-    public void configure(String base, VirtualHostConfiguration config)
+    public TransactionLog configure(String base, VirtualHostConfiguration config)
     {
         //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
         if (base.equals("store"))
@@ -85,12 +81,14 @@
             _log.info("Using capacity " + hashtableCapacity + " for hash tables");
             _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
             _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
+            return new BaseTransactionLog(this);
         }
+        return null;
     }
 
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
     {
-        configure(base, config);
+        return configure(base, config);
     }
 
     public void close() throws Exception
@@ -108,7 +106,7 @@
         }
     }
 
-    private void removeMessage(StoreContext context, Long messageId) throws AMQException
+    public void removeMessage(StoreContext context, Long messageId) throws AMQException
     {
         checkNotClosed();
         if (_log.isDebugEnabled())
@@ -117,7 +115,6 @@
         }
         _metaDataMap.remove(messageId);
         _contentBodyMap.remove(messageId);
-        _messageEnqueueMap.remove(messageId);
     }
 
     public void createExchange(Exchange exchange) throws AMQException
@@ -155,41 +152,25 @@
         // Not required to do anything
     }
 
-    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
-        synchronized (_messageEnqueueMap)
+        for (AMQQueue q : queues)
         {
-            List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
-            if (queues == null)
+            if (q.isDurable())
             {
-                queues = new LinkedList<AMQQueue>();
-                _messageEnqueueMap.put(messageId, queues);
+                enqueueMessage(context,q,messageId);
             }
-
-            queues.add(queue);
         }
     }
 
-    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
-        synchronized (_messageEnqueueMap)
-        {
-            List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
-            if (queues == null || !queues.contains(queue))
-            {
-                throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName()
-                                           + " but it is not enqueued on that queue.");
-            }
-            else
-            {
-                queues.remove(queue);
-                if (queues.isEmpty())
-                {
-                    removeMessage(context,messageId);
-                }
-            }
-        }
+        // Not required to do anything
+    }
 
+    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    {
+        // Not required to do anything
     }
 
     public void beginTran(StoreContext context) throws AMQException

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Fri Apr  3 13:38:11 2009
@@ -21,6 +21,12 @@
 package org.apache.qpid.server.store;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * A context that the store can use to associate with a transactional context. For example, it could store
@@ -32,17 +38,37 @@
 {
     private static final Logger _logger = Logger.getLogger(StoreContext.class);
 
+    private static final String DEFAULT_NAME = "StoreContext";
     private String _name;
     private Object _payload;
+    private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap;
+    private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap;
+    private boolean _async;
 
     public StoreContext()
     {
-        _name = "StoreContext";
+        this(DEFAULT_NAME);
     }
 
     public StoreContext(String name)
     {
+        this(name,false);
+    }
+
+    /**
+     *
+     * @param name The name of this Transaction
+     * @param asynchrouous Is this Transaction Asynchronous
+     */
+    public StoreContext(String name, boolean asynchrouous)
+    {
         _name = name;
+        _async = asynchrouous;
+    }
+
+    public StoreContext(boolean asynchronous)
+    {
+        this(DEFAULT_NAME, asynchronous);
     }
 
     public Object getPayload()
@@ -52,7 +78,7 @@
 
     public void setPayload(Object payload)
     {
-        if(_logger.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
             _logger.debug("public void setPayload(Object payload = " + payload + "): called");
         }
@@ -68,4 +94,95 @@
     {
         return "<_name = " + _name + ", _payload = " + _payload + ">";
     }
+
+    public Map<Long, ArrayList<AMQQueue>> getEnqueueMap()
+    {
+        return _enqueueMap;
+    }
+
+    public Map<Long, ArrayList<AMQQueue>> getDequeueMap()
+    {
+        return _dequeueMap;
+    }
+
+    /**
+     * Record the enqueues for processing if we abort
+     *
+     * @param queues
+     * @param messageId
+     *
+     * @throws AMQException
+     */
+    public void enqueueMessage(ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+    {
+        if (inTransaction())
+        {
+            ArrayList<AMQQueue> enqueues = _enqueueMap.get(messageId);
+
+            if (enqueues == null)
+            {
+                enqueues = new ArrayList<AMQQueue>();
+                _enqueueMap.put(messageId, enqueues);
+            }
+
+            for (AMQQueue q : queues)
+            {
+                if (!enqueues.contains(q))
+                {
+                    enqueues.add(q);
+                }
+            }
+
+        }
+    }
+
+    /**
+     * Record the dequeue for processing on commit
+     *
+     * @param queue
+     * @param messageId
+     *
+     * @throws AMQException
+     */
+    public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException
+    {
+        if (inTransaction())
+        {
+            ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
+
+            if (dequeues == null)
+            {
+                dequeues = new ArrayList<AMQQueue>();
+                _dequeueMap.put(messageId, dequeues);
+            }
+
+            dequeues.add(queue);
+        }
+    }
+
+    public void beginTransaction() throws AMQException
+    {
+        _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+        _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+    }
+
+    public void commitTransaction() throws AMQException
+    {
+        _dequeueMap.clear();
+    }
+
+    public void abortTransaction() throws AMQException
+    {
+        _enqueueMap.clear();
+    }
+
+    public boolean inTransaction()
+    {
+        return _payload != null;
+    }
+
+    public boolean isAsync()
+    {
+        return _async;
+    }
 }

Added: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=761673&view=auto
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java (added)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java Fri Apr  3 13:38:11 2009
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class BaseTransactionLog implements TransactionLog
+{
+    private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class);
+
+    TransactionLog _delegate;
+    private Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
+
+    public BaseTransactionLog(TransactionLog delegate)
+    {
+        _delegate = delegate;
+    }
+
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    {
+        return _delegate.configure(virtualHost, base, config);
+    }
+
+    public void close() throws Exception
+    {
+        _delegate.close();
+    }
+
+    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+    {
+        context.enqueueMessage(queues, messageId);
+
+        if (queues.size() > 0)
+        {
+            _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
+
+            //Clone the list incase someone else changes it.
+            _idToQueues.put(messageId, (ArrayList) queues.clone());
+        }
+
+        _delegate.enqueueMessage(context, queues, messageId);
+    }
+
+    public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    {
+        if (context.inTransaction())
+        {
+            context.dequeueMessage(queue, messageId);
+
+            Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
+
+            //For each Message ID that is in the map check
+            for (Long messageID : messageMap.keySet())
+            {
+                //If we don't have a gloabl reference for this message then there is only a single enqueue
+                if (_idToQueues.get(messageID) == null)
+                {
+                    // Add the removal of the message to this transaction
+                    _delegate.removeMessage(context,messageID);
+                    // Remove this message ID as we have processed it so we don't reprocess after the main commmit
+                    messageMap.remove(messageID);
+                }
+            }
+        }
+
+        _delegate.dequeueMessage(context, queue, messageId);
+
+        if (!context.inTransaction())
+        {
+            HashMap<Long, ArrayList<AMQQueue>> dequeue = new HashMap<Long, ArrayList<AMQQueue>>();
+            ArrayList list = new ArrayList();
+            list.add(queue);
+            dequeue.put(messageId, list);
+            processDequeues(dequeue);
+        }
+    }
+
+    /**
+     * This should not be called from main broker code.
+     * // Perhaps we need a new interface:
+     *
+     * Broker <->TransactionLog
+     * Broker <->BaseTransactionLog<->(Log with removeMessage())
+     */
+    public void removeMessage(StoreContext context, Long messageId) throws AMQException
+    {
+        _delegate.removeMessage(context, messageId);
+    }
+
+    public void beginTran(StoreContext context) throws AMQException
+    {
+        context.beginTransaction();
+        _delegate.beginTran(context);
+    }
+
+    public void commitTran(StoreContext context) throws AMQException
+    {
+        //Perform real commit of current data
+        _delegate.commitTran(context);
+
+        // If we have dequeues to process then process them
+        if (context.getDequeueMap() != null)
+        {
+            processDequeues(context.getDequeueMap());
+        }
+
+        //Commit the recorded state for this transaction.
+        context.commitTransaction();
+    }
+
+    public void abortTran(StoreContext context) throws AMQException
+    {
+        // If we have enqueues to rollback
+        if (context.getEnqueueMap() != null)
+        {
+            processDequeues(context.getEnqueueMap());
+        }
+        //Abort the recorded state for this transaction.
+        context.abortTransaction();
+
+        _delegate.abortTran(context);
+    }
+
+    private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap)
+            throws AMQException
+    {
+        // Process any enqueues to bring our model up to date.
+        Set<Long> messageIDs = messageMap.keySet();
+
+        //Create a new Asynchronous Context.
+        StoreContext removeContext = new StoreContext(true);
+
+        //Batch Process the Dequeues on the delegate
+        _delegate.beginTran(removeContext);
+
+        try
+        {
+            //For each Message ID Decrement the reference for each of the queues it was on.
+            for (Long messageID : messageIDs)
+            {
+                ArrayList<AMQQueue> queueList = messageMap.get(messageID);
+
+                // For each of the queues decrement the reference
+                for (AMQQueue queue : queueList)
+                {
+                    ArrayList<AMQQueue> enqueuedList = _idToQueues.get(messageID);
+
+                    // If we have no mapping then this message was only enqueued on a single queue
+                    // This will be the case when we are not in a larger transaction
+                    if (enqueuedList == null)
+                    {
+                        _delegate.removeMessage(removeContext, messageID);
+                    }
+                    else
+                    {
+                        // Update the enqueued list
+                        enqueuedList.remove(queue);
+
+                        // If the list is now empty then remove the message
+                        if (enqueuedList.isEmpty())
+                        {
+                            _delegate.removeMessage(removeContext, messageID);
+                        }
+                    }
+                }
+            }
+
+            //Commit the removes on the delegate.
+            _delegate.commitTran(removeContext);
+        }
+        finally
+        {
+            if (removeContext.inTransaction())
+            {
+                _delegate.abortTran(removeContext);
+            }
+        }
+    }
+
+    public boolean inTran(StoreContext context)
+    {
+        return _delegate.inTran(context);
+    }
+
+    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+    {
+        _delegate.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+    }
+
+    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+    {
+        _delegate.storeMessageMetaData(context, messageId, messageMetaData);
+    }
+
+    public boolean isPersistent()
+    {
+        return _delegate.isPersistent();
+    }
+
+    public TransactionLog getDelegate()
+    {
+        return _delegate;
+    }
+}

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java Fri Apr  3 13:38:11 2009
@@ -20,19 +20,16 @@
  */
 package org.apache.qpid.server.transactionlog;
 
-import org.apache.commons.configuration.Configuration;
-
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.store.StoreContext;
 
+import java.util.ArrayList;
+
 /**
  * TransactionLog defines the interface for performing transactions.
  * This is used to preserve the state of messages, queues
@@ -68,7 +65,7 @@
      *
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
-    void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
+    Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
 
     /**
      * Called to close and cleanup any resources used by the message store.
@@ -81,27 +78,33 @@
      * Places a message onto a specified queue, in a given transactional context.
      *
      * @param context   The transactional context for the operation.
-     * @param queue     The queue to place the message on.
-     * @param messageId The message to enqueue.
-     * @throws AMQException If the operation fails for any reason.
+     * @param queues
+     *@param messageId The message to enqueue.  @throws AMQException If the operation fails for any reason.  @throws org.apache.qpid.AMQException
      */
-    void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
+    void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException;
 
     /**
      * Extracts a message from a specified queue, in a given transactional context.
      *
      * @param context   The transactional context for the operation.
-     * @param queue     The queue to place the message on.
-     * @param messageId The message to dequeue.
-     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+     * @param queue
+     * @param messageId The message to dequeue.  @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
     void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
 
     /**
+     * Remove the specified message from the log
+     *
+     * @param context The transactional context for the operation
+     * @param messageId The message to remove
+     * @throws AMQException
+     */
+    void removeMessage(StoreContext context, Long messageId) throws AMQException;
+
+    /**
      * Begins a transactional context.
      *
      * @param context The transactional context to begin.
-     *
      * @throws AMQException If the operation fails for any reason.
      */
     void beginTran(StoreContext context) throws AMQException;
@@ -158,31 +161,31 @@
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
     void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
-
-    /**
-     * Retrieves message meta-data.
-     *
-     * @param context   The transactional context for the operation.
-     * @param messageId The message to get the meta-data for.
-     *
-     * @return The message meta data.
-     *
-     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
-     */
-    MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
-
-    /**
-     * Retrieves a chunk of message data.
-     *
-     * @param context   The transactional context for the operation.
-     * @param messageId The message to get the data chunk for.
-     * @param index     The offset index of the data chunk within the message.
-     *
-     * @return A chunk of message data.
-     *
-     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
-     */
-    ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+//
+//    /**
+//     * Retrieves message meta-data.
+//     *
+//     * @param context   The transactional context for the operation.
+//     * @param messageId The message to get the meta-data for.
+//     *
+//     * @return The message meta data.
+//     *
+//     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+//     */
+//    MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
+//
+//    /**
+//     * Retrieves a chunk of message data.
+//     *
+//     * @param context   The transactional context for the operation.
+//     * @param messageId The message to get the data chunk for.
+//     * @param index     The offset index of the data chunk within the message.
+//     *
+//     * @return A chunk of message data.
+//     *
+//     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+//     */
+//    ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
 
     /**
      * Is this store capable of persisting the data

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Apr  3 13:38:11 2009
@@ -51,6 +51,7 @@
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
 
 import javax.management.NotCompliantMBeanException;
 import java.util.Collections;
@@ -206,6 +207,14 @@
             {
                 _routingTable = (RoutingTable) _transactionLog;
             }
+            else if (_transactionLog instanceof BaseTransactionLog)
+            {
+                TransactionLog delegate = ((BaseTransactionLog) _transactionLog).getDelegate();
+                if (delegate instanceof RoutingTable)
+                {
+                    _routingTable = (RoutingTable) delegate;
+                }
+            }
         }
         else
         {
@@ -292,7 +301,8 @@
             _routingTable = (RoutingTable) _transactionLog;
         }
 
-        _transactionLog.configure(this, "store", config);
+        // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+        _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config);
     }
 
     //todo we need to move from store.class to transactionlog.class
@@ -497,8 +507,9 @@
         public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
         public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
 
-        public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+        public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
         {
+            return null;
         }
 
         public void close() throws Exception

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Fri Apr  3 13:38:11 2009
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.store.StoreContext;
 
 import java.util.ArrayList;
 
@@ -111,7 +112,7 @@
         arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
 
         // Create IncomingMessage and nondurable queue
-        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
 
         //Create a priorityQueue
         _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments);

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Apr  3 13:38:11 2009
@@ -113,7 +113,7 @@
     private void verifyBrokerState()
     {
 
-        TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getTransactionLog());
+        TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog());
 
         // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
         assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());       

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Fri Apr  3 13:38:11 2009
@@ -75,7 +75,7 @@
         ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
 
         VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
-        _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog());
+        _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog());
         _protocolSession = new MockProtocolSession(_messageStore);
         _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
 

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java Fri Apr  3 13:38:11 2009
@@ -109,7 +109,7 @@
 
         // Check that it was not enqueued
         List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
-        assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList);
+        assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty());
         checkMessageMetaDataRemoved(messageId);
 
         assertEquals("Return message count not correct", 1, _returnMessages.size());
@@ -152,8 +152,8 @@
         {
             assertNull("Message MetaData still exists for message:" + messageId,
                        _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
-            assertNull("Message still has values in the reference map:" + messageId,
-                       _messageStore.getMessageReferenceMap(messageId));
+            List ids = _messageStore.getMessageReferenceMap(messageId);
+            assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty());
 
         }
         catch (AMQException e)

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Apr  3 13:38:11 2009
@@ -319,7 +319,7 @@
     public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
     {
         // Create IncomingMessage and nondurable queue
-        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
         IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
@@ -351,17 +351,17 @@
         MockQueueEntry entry = new MockQueueEntry(message, _queue);
         entry.getQueueEntryList().add(message);
         entry.acquire();
-        entry.dequeue(null);
+        entry.dequeue(new StoreContext());
 
         // Check that it is dequeued
         data = _transactionLog.getMessageReferenceMap(messageId);
-        assertNull(data);
+        assertTrue(data == null || data.isEmpty());
     }
 
     public void testMessagesFlowToDisk() throws AMQException, InterruptedException
     {
         // Create IncomingMessage and nondurable queue
-        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
 
         MESSAGE_SIZE = 1;
         long MEMORY_MAX = 500;
@@ -431,7 +431,7 @@
     public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
     {
         // Create IncomingMessage and nondurable queue
-        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
 
         MESSAGE_SIZE = 1;
         /** Set to larger than the purge batch size. Default 100.

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Fri Apr  3 13:38:11 2009
@@ -34,6 +34,7 @@
 import org.apache.qpid.server.routing.RoutingTable;
 
 import java.util.List;
+import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -48,9 +49,9 @@
     {
     }
     
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        return this;
     }
 
     public void close() throws Exception
@@ -146,7 +147,7 @@
 
     }
 
-    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
 
     }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Apr  3 13:38:11 2009
@@ -21,155 +21,191 @@
 package org.apache.qpid.server.store;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
 import org.apache.qpid.server.transactionlog.TransactionLog;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.commons.configuration.Configuration;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
+/** Adds some extra methods to the memory message store for testing purposes. */
 public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
 {
+    private TransactionLog _transactionLog;
+    private RoutingTable _routingTable;
+    private MemoryMessageStore _mms;
+
+    public TestableMemoryMessageStore(TransactionLog log)
+    {
+        _transactionLog = log;
+        if (log instanceof BaseTransactionLog)
+        {
+            TransactionLog delegate = ((BaseTransactionLog) log).getDelegate();
+            if (delegate instanceof RoutingTable)
+            {
+                _routingTable = (RoutingTable) delegate;
+            }
+            else
+            {
+                throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log);
+            }
+
+            if (delegate instanceof MemoryMessageStore)
+            {
+                _mms = (MemoryMessageStore) delegate;
+            }
+
+        }
+        else
+        {
+            throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log);
+        }
 
-    MemoryMessageStore _mms = null;
+    }
 
     public TestableMemoryMessageStore(MemoryMessageStore mms)
     {
-        _mms = mms;
+        _routingTable = mms;
+        _transactionLog = mms.configure();
     }
 
     public TestableMemoryMessageStore()
     {
         _mms = new MemoryMessageStore();
-        _mms.configure();
+        _transactionLog = _mms.configure();
+        _routingTable = _mms;        
     }
 
     public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
     {
-        return _mms._metaDataMap;
+        return ((MemoryMessageStore) _routingTable)._metaDataMap;
     }
 
     public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
     {
-        return _mms._contentBodyMap;
+        return ((MemoryMessageStore) _routingTable)._contentBodyMap;
     }
 
     public List<AMQQueue> getMessageReferenceMap(Long messageId)
     {
-        return _mms._messageEnqueueMap.get(messageId);
+//        return _mms._messageEnqueueMap.get(messageId);
+//        ((BaseTransactionLog)_transactionLog).
+        return new ArrayList<AMQQueue>();
     }
 
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
     {
-        _mms.configure(virtualHost,base,config);
+        _transactionLog  = (TransactionLog) _transactionLog.configure(virtualHost, base, config);
+        return _transactionLog;
     }
 
     public void close() throws Exception
     {
-        _mms.close();
+        _transactionLog.close();
+        _routingTable.close();
     }
 
     public void createExchange(Exchange exchange) throws AMQException
     {
-        _mms.createExchange(exchange);
+        _routingTable.createExchange(exchange);
     }
 
     public void removeExchange(Exchange exchange) throws AMQException
     {
-        _mms.removeExchange(exchange);
+        _routingTable.removeExchange(exchange);
     }
 
     public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
     {
-        _mms.bindQueue(exchange,routingKey,queue,args);
+        _routingTable.bindQueue(exchange, routingKey, queue, args);
     }
 
     public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
     {
-        _mms.unbindQueue(exchange,routingKey,queue,args);
+        _routingTable.unbindQueue(exchange, routingKey, queue, args);
     }
 
     public void createQueue(AMQQueue queue) throws AMQException
     {
-        _mms.createQueue(queue);
+        _routingTable.createQueue(queue);
     }
 
     public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
     {
-        _mms.createQueue(queue,arguments);
+        _routingTable.createQueue(queue, arguments);
     }
 
     public void removeQueue(AMQQueue queue) throws AMQException
     {
-        _mms.removeQueue(queue);
+        _routingTable.removeQueue(queue);
     }
 
-    public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
-        _mms.enqueueMessage(context,queue,messageId);
+        _transactionLog.enqueueMessage(context, queues, messageId);
     }
 
     public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
     {
-        _mms.dequeueMessage(context,queue,messageId);
+        _transactionLog.dequeueMessage(context, queue, messageId);
+    }
+
+    public void removeMessage(StoreContext context, Long messageId) throws AMQException
+    {
+        _transactionLog.removeMessage(context, messageId);
     }
 
     public void beginTran(StoreContext context) throws AMQException
     {
-        _mms.beginTran(context);
+        _transactionLog.beginTran(context);
     }
 
     public void commitTran(StoreContext context) throws AMQException
     {
-        _mms.commitTran(context);
+        _transactionLog.commitTran(context);
     }
 
     public void abortTran(StoreContext context) throws AMQException
     {
-    _mms.abortTran(context);
+        _transactionLog.abortTran(context);
     }
 
     public boolean inTran(StoreContext context)
     {
-        return _mms.inTran(context);
+        return _transactionLog.inTran(context);
     }
 
     public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
     {
-        _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody);
+        _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
     }
 
     public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
     {
-        _mms.storeMessageMetaData(context,messageId,messageMetaData);
+        _transactionLog.storeMessageMetaData(context, messageId, messageMetaData);
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    public boolean isPersistent()
     {
-        return _mms.getMessageMetaData(context,messageId);
+        return _transactionLog.isPersistent();
     }
 
-    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
     {
-        return _mms.getContentBodyChunk(context,messageId,index);
+        return _mms.getMessageMetaData(context, messageId);
     }
 
-    public boolean isPersistent()
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
     {
-        return _mms.isPersistent();
+        return _mms.getContentBodyChunk(context, messageId, index);
     }
 }

Added: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java?rev=761673&view=auto
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java (added)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java Fri Apr  3 13:38:11 2009
@@ -0,0 +1,535 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.MockContentChunk;
+import org.apache.qpid.server.queue.MockPersistentAMQMessage;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class BaseTransactionLogTest extends TestCase implements TransactionLog
+{
+    private boolean _inTransaction;
+    final private Map<Long, ArrayList<AMQQueue>> _enqueues = new HashMap<Long, ArrayList<AMQQueue>>();
+    final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>();
+    final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>();
+
+    BaseTransactionLog _transactionLog;
+    private ArrayList<AMQQueue> _queues;
+    private MockPersistentAMQMessage _message;
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _transactionLog = new BaseTransactionLog(this);
+    }
+
+    public void testSingleEnqueueNoTransactional() throws AMQException
+    {
+        //Store Data
+
+        _message = new MockPersistentAMQMessage(1L, this);
+
+        _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+        ContentHeaderBody chb = new ContentHeaderBody();
+
+        _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+
+        verifyMessageStored(_message.getMessageId());
+        // Enqueue
+
+        _queues = new ArrayList<AMQQueue>();
+        MockAMQQueue queue = new MockAMQQueue(this.getName());
+        _queues.add(queue);
+
+        _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+
+        verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+    }
+
+    public void testSingleDequeueNoTransaction() throws AMQException
+    {
+        // Enqueue a message to dequeue
+        testSingleEnqueueNoTransactional();
+
+        _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+
+        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+    }
+
+    public void testSingleEnqueueTransactional() throws AMQException
+    {
+        StoreContext context = new StoreContext();
+
+        _transactionLog.beginTran(context);
+
+        //Store Data
+        _message = new MockPersistentAMQMessage(1L, this);
+
+        _message.addContentBodyFrame(context, new MockContentChunk(100), true);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+        ContentHeaderBody chb = new ContentHeaderBody();
+
+        _message.setPublishAndContentHeaderBody(context, mpi, chb);
+
+        _transactionLog.commitTran(context);
+
+        verifyMessageStored(_message.getMessageId());
+
+        // Enqueue
+        _transactionLog.beginTran(context);
+
+        _queues = new ArrayList<AMQQueue>();
+        MockAMQQueue queue = new MockAMQQueue(this.getName());
+        _queues.add(queue);
+
+        _transactionLog.enqueueMessage(context, _queues, _message.getMessageId());
+
+        _transactionLog.commitTran(context);
+        verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+    }
+
+    public void testSingleDequeueTransaction() throws AMQException
+    {
+        // Enqueue a message to dequeue
+        testSingleEnqueueTransactional();
+
+        StoreContext context = new StoreContext();
+
+        _transactionLog.beginTran(context);
+
+        _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId());
+
+        _transactionLog.commitTran(context);
+
+        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+    }
+
+
+    public void testMultipleEnqueueNoTransactional() throws AMQException
+    {
+        //Store Data
+
+        _message = new MockPersistentAMQMessage(1L, this);
+
+        _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+        ContentHeaderBody chb = new ContentHeaderBody();
+
+        _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+
+        verifyMessageStored(_message.getMessageId());
+        // Enqueue
+
+        _queues = new ArrayList<AMQQueue>();
+
+        MockAMQQueue queue = new MockAMQQueue(this.getName());
+        _queues.add(queue);
+
+        queue = new MockAMQQueue(this.getName() + "2");
+        _queues.add(queue);
+
+        queue = new MockAMQQueue(this.getName() + "3");
+        _queues.add(queue);
+
+        _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+
+        verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+    }
+
+    public void testMultipleDequeueNoTransaction() throws AMQException
+    {
+        // Enqueue a message to dequeue
+        testMultipleEnqueueNoTransactional();
+
+        _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+
+        ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
+        assertNotNull("Message not enqueued", enqueued);
+        assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0)));
+        assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
+
+        assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+
+        _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId());
+
+        enqueued = _enqueues.get(_message.getMessageId());
+        assertNotNull("Message not enqueued", enqueued);
+        assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1)));
+        assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
+        
+        assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+        _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId());
+
+        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+    }
+
+
+    public void testMultipleEnqueueTransactional() throws AMQException
+    {
+        StoreContext context = new StoreContext();
+
+        _transactionLog.beginTran(context);
+
+        //Store Data
+        _message = new MockPersistentAMQMessage(1L, this);
+
+        _message.addContentBodyFrame(context, new MockContentChunk(100), true);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+        ContentHeaderBody chb = new ContentHeaderBody();
+
+        _message.setPublishAndContentHeaderBody(context, mpi, chb);
+
+        _transactionLog.commitTran(context);
+
+        verifyMessageStored(_message.getMessageId());
+
+        // Enqueue
+        _transactionLog.beginTran(context);
+
+        _queues = new ArrayList<AMQQueue>();
+        MockAMQQueue queue = new MockAMQQueue(this.getName());
+        _queues.add(queue);
+
+        queue = new MockAMQQueue(this.getName() + "2");
+        _queues.add(queue);
+
+        queue = new MockAMQQueue(this.getName() + "3");
+        _queues.add(queue);
+
+        _transactionLog.enqueueMessage(context, _queues, _message.getMessageId());
+
+        _transactionLog.commitTran(context);
+        verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+    }
+
+    public void testMultipleDequeueMultipleTransactions() throws AMQException
+    {
+        // Enqueue a message to dequeue
+        testMultipleEnqueueTransactional();
+
+        StoreContext context = new StoreContext();
+
+        _transactionLog.beginTran(context);
+
+        _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId());
+
+        _transactionLog.commitTran(context);
+        ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
+        assertNotNull("Message not enqueued", enqueued);
+        assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+        assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
+
+        assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+        _transactionLog.beginTran(context);
+
+        _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId());
+
+        _transactionLog.commitTran(context);
+
+        enqueued = _enqueues.get(_message.getMessageId());
+        assertNotNull("Message not enqueued", enqueued);
+        assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1)));
+        assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
+
+        assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+        _transactionLog.beginTran(context);
+
+        _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId());
+
+        _transactionLog.commitTran(context);
+
+        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+    }
+
+     public void testMultipleDequeueSingleTransaction() throws AMQException
+    {
+        // Enqueue a message to dequeue
+        testMultipleEnqueueTransactional();
+
+        StoreContext context = new StoreContext();
+
+        _transactionLog.beginTran(context);
+
+        _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId());
+
+        ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
+        assertNotNull("Message not enqueued", enqueued);
+        assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+        assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
+
+        assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+
+        _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId());
+
+
+        enqueued = _enqueues.get(_message.getMessageId());
+        assertNotNull("Message not enqueued", enqueued);
+        assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1)));
+        assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
+
+        assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+
+        _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId());
+
+        _transactionLog.commitTran(context);
+
+        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+    }
+
+    private void verifyMessageStored(Long messageId)
+    {
+        assertTrue("MessageMD has not been stored", _storeMetaData.containsKey(messageId));
+        assertTrue("Messasge Chunk has not been stored", _storeChunks.containsKey(messageId));
+    }
+
+    private void verifyEnqueuedOnQueues(Long messageId, ArrayList<AMQQueue> queues)
+    {
+        ArrayList<AMQQueue> enqueues = _enqueues.get(messageId);
+
+        assertNotNull("Message not enqueued", enqueues);
+        assertEquals("Message is not enqueued on the right number of queues", queues.size(), enqueues.size());
+        for (AMQQueue queue : queues)
+        {
+            assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue));
+        }
+    }
+
+    /*************************** TransactionLog *******************************
+     *
+     * Simple InMemory TransactionLog that actually records enqueues/dequeues
+     */
+
+    /**
+     * @param virtualHost The virtual host using by this store
+     * @param base        The base element identifier from which all configuration items are relative. For example, if
+     *                    the base element is "store", the all elements used by concrete classes will be "store.foo" etc.
+     * @param config      The apache commons configuration object.
+     *
+     * @return
+     *
+     * @throws Exception
+     */
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    {
+        return this;
+    }
+
+    public void close() throws Exception
+    {
+    }
+
+    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+    {
+        for (AMQQueue queue : queues)
+        {
+            enqueueMessage(messageId, queue);
+        }
+    }
+
+    private void enqueueMessage(Long messageId, AMQQueue queue)
+    {
+        ArrayList<AMQQueue> queues = _enqueues.get(messageId);
+
+        if (queues == null)
+        {
+            synchronized (_enqueues)
+            {
+                queues = _enqueues.get(messageId);
+                if (queues == null)
+                {
+                    queues = new ArrayList<AMQQueue>();
+                    _enqueues.put(messageId, queues);
+                }
+            }
+        }
+
+        synchronized (queues)
+        {
+            queues.add(queue);
+        }
+    }
+
+    public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    {
+        ArrayList<AMQQueue> queues = _enqueues.get(messageId);
+
+        if (queues == null)
+        {
+            throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
+                                       "queue(" + queue + ") but no enqueue data available");
+        }
+
+        synchronized (queues)
+        {
+            if (!queues.contains(queue))
+            {
+                throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
+                                           "queue(" + queue + ") but no message not enqueued on queue");
+            }
+                       
+            queues.remove(queue);
+        }
+    }
+
+    public void removeMessage(StoreContext context, Long messageId) throws AMQException
+    {
+        ArrayList<AMQQueue> queues;
+
+        synchronized (_enqueues)
+        {
+            queues = _enqueues.remove(messageId);
+        }
+
+        if (queues == null)
+        {
+            throw new RuntimeException("Attempt to remove message(" + messageId + ") but " +
+                                       "no enqueue data available");
+        }
+
+        if (!queues.isEmpty())
+        {
+            throw new RuntimeException("Removed a message(" + messageId + ") that still had references.");
+        }
+
+        synchronized (_storeMetaData)
+        {
+            _storeMetaData.remove(messageId);
+        }
+
+        synchronized (_storeChunks)
+        {
+            _storeChunks.remove(messageId);
+        }
+
+    }
+
+    //
+    // This class does not attempt to operate transactionally. It only knows when it should be in a transaction.
+    //  Data is stored immediately.
+    //
+
+    public void beginTran(StoreContext context) throws AMQException
+    {
+        context.setPayload(new Object());
+    }
+
+    public void commitTran(StoreContext context) throws AMQException
+    {
+        context.setPayload(null);
+    }
+
+    public void abortTran(StoreContext context) throws AMQException
+    {
+        _inTransaction = false;
+    }
+
+    public boolean inTran(StoreContext context)
+    {
+        return _inTransaction;
+    }
+
+    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+    {
+        ArrayList<ContentChunk> chunks = _storeChunks.get(messageId);
+
+        if (chunks == null)
+        {
+            synchronized (_storeChunks)
+            {
+                chunks = _storeChunks.get(messageId);
+                if (chunks == null)
+                {
+                    chunks = new ArrayList<ContentChunk>();
+                    _storeChunks.put(messageId, chunks);
+                }
+            }
+        }
+
+        synchronized (chunks)
+        {
+            chunks.add(contentBody);
+        }
+    }
+
+    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+    {
+        if (_storeMetaData.get(messageId) != null)
+        {
+            throw new RuntimeException("Attempt to storeMessageMetaData for messageId(" + messageId + ") but data already exists");
+        }
+
+        synchronized (_storeMetaData)
+        {
+            _storeMetaData.put(messageId, messageMetaData);
+        }
+    }
+
+    public boolean isPersistent()
+    {
+        return false;
+    }
+}

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java Fri Apr  3 13:38:11 2009
@@ -69,7 +69,7 @@
     public void testCommitWithFailureDuringPrepare() throws AMQException
     {
         MockStore store = new MockStore();
-        store.beginTran(null);
+        store.beginTran(new StoreContext());
 
         TxnBuffer buffer = new TxnBuffer();
         buffer.enlist(new StoreMessageOperation(store));
@@ -81,7 +81,7 @@
 
         try
         {
-            buffer.commit(null);
+            buffer.commit(new StoreContext());
         }
         catch (NoSuchElementException e)
         {
@@ -95,7 +95,7 @@
     public void testCommitWithPersistance() throws AMQException
     {
         MockStore store = new MockStore();
-        store.beginTran(null);
+        store.beginTran(new StoreContext());
         store.expectCommit();
 
         TxnBuffer buffer = new TxnBuffer();
@@ -105,7 +105,7 @@
         buffer.enlist(new StoreMessageOperation(store));
         buffer.enlist(new TxnTester(store));
 
-        buffer.commit(null);
+        buffer.commit(new StoreContext());
         validateOps();
         store.validate();
     }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java Fri Apr  3 13:38:11 2009
@@ -39,8 +39,7 @@
             _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
 
             assertNotNull(_virtualHost.getTransactionLog());
-            assertNotNull(_virtualHost.getRoutingTable());
-            assertEquals(_virtualHost.getTransactionLog(),_virtualHost.getRoutingTable());
+            assertNotNull(_virtualHost.getRoutingTable());            
         }
         catch (Exception e)
         {

Modified: qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=761673&r1=761672&r2=761673&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original)
+++ qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Fri Apr  3 13:38:11 2009
@@ -36,6 +36,7 @@
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.ArrayList;
 
 public class SlowMessageStore implements TransactionLog, RoutingTable
 {
@@ -50,7 +51,7 @@
     private static final String POST = "post";
     private String DEFAULT_DELAY = "default";
 
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
     {
         _logger.warn("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
         Configuration delays = config.getStoreConfiguration().subset(DELAYS);
@@ -81,7 +82,11 @@
                 _realTransactionLog = (TransactionLog) o;
             }
         }
-        _realTransactionLog.configure(virtualHost, base , config);
+
+        // The call to configure may return a new transaction log 
+        _realTransactionLog = (TransactionLog) _realTransactionLog.configure(virtualHost, base , config);
+        
+        return this;
     }
 
     private void configureDelays(Configuration config)
@@ -205,10 +210,10 @@
         doPostDelay("removeQueue");
     }
 
-    public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
         doPreDelay("enqueueMessage");
-        _realTransactionLog.enqueueMessage(context, queue, messageId);
+        _realTransactionLog.enqueueMessage(context, queues, messageId);
         doPostDelay("enqueueMessage");
     }
 
@@ -219,6 +224,13 @@
         doPostDelay("dequeueMessage");
     }
 
+    public void removeMessage(StoreContext context, Long messageId) throws AMQException
+    {
+        doPreDelay("dequeueMessage");
+        _realTransactionLog.removeMessage(context, messageId);
+        doPostDelay("dequeueMessage");
+    }
+
     public void beginTran(StoreContext context) throws AMQException
     {
         doPreDelay("beginTran");
@@ -262,22 +274,22 @@
         doPostDelay("storeMessageMetaData");
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
-    {
-        doPreDelay("getMessageMetaData");
-        MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId);
-        doPostDelay("getMessageMetaData");
-        return mmd;
-    }
-
-    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
-    {
-        doPreDelay("getContentBodyChunk");
-        ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index);
-        doPostDelay("getContentBodyChunk");
-        return c;
-    }
-
+//    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+//    {
+//        doPreDelay("getMessageMetaData");
+//        MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId);
+//        doPostDelay("getMessageMetaData");
+//        return mmd;
+//    }
+//
+//    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+//    {
+//        doPreDelay("getContentBodyChunk");
+//        ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index);
+//        doPostDelay("getContentBodyChunk");
+//        return c;
+//    }
+//
     public boolean isPersistent()
     {
         return _realTransactionLog.isPersistent();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org