You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/02/27 14:37:03 UTC

svn commit: r748519 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ main/java/org/apache/qpid/server/virtualhost/ test/java/org/apache/qpid/server/exchange/ test/java/org/apache/qpid/server/queue/ test/java/org/apache/qpi...

Author: ritchiem
Date: Fri Feb 27 13:37:03 2009
New Revision: 748519

URL: http://svn.apache.org/viewvc?rev=748519&view=rev
Log:
QPID-1635,QPID-1636,QPID-1638 : Updated QueueEntries to contain additional values from AMQMessage, _flags and expiry this allows the checking of immediate delivery and expiry on unloaded messages.
Updated nomenclature to use load/unload rather than the overloaded flow/recover.
Created new FileQueueBackingStoreFactory to ensure that validates and creates initial flowToDiskLocation and creates a new BackingStore.
Responsibility for FlowToDisk has been added to the QueueEntryLists. This will allow the easy unloading of the structure in the future. Inorder to do this the size,count and memory count properties had to be moved from the SimpleAMQQueue to the QueueEntryList.
An Inhaler thread was created in addition to the synchronous loading of messages. This is initiated as a result of a flowed QEL dropping below the minimumMemory value.

A test to ensure that the queue never exceeds its set memory usage and that the count does not go negative has been added to SimpleAMQQueueTest.

The SimpleAMQQueue is responsible for deciding when a message can be unloaded after delivery takes place. The QEL cannot decide this as there is no state for a message being marked as sent to a consumer. Only Aquired and Dequeued. The unloaded message is only deleted after the QueueEntry is deleted from the QEL. This negates the need to recreated the data on disk if the message needs to be unloaded again.

All files/directories relating to FtD are created as deleteOnExit files so that under clean shutdown the VM will ensure that the files are deleted. On startup the flowToDiskLocation is also purged to ensure a clean starting point.

SAMQQueueThreadPoolTest was augmented to take in to account the new inhaler executor reference.

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java
      - copied, changed from r748516, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java Fri Feb 27 13:37:03 2009
@@ -22,19 +22,12 @@
 
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.util.FileUtils;
-import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.ConfigurationException;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -43,219 +36,142 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class FileQueueBackingStore implements QueueBackingStore
 {
     private static final Logger _log = Logger.getLogger(FileQueueBackingStore.class);
 
-    private AtomicBoolean _closed = new AtomicBoolean(false);
     private String _flowToDiskLocation;
-    private static final String QUEUE_BACKING_DIR = "queueBacking";
 
-    public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException
+    public FileQueueBackingStore(String location)
     {
-        setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation());
+        _flowToDiskLocation = location;
     }
 
-    private void setFlowToDisk(String vHostName, String location) throws ConfigurationException
+    public AMQMessage load(Long messageId)
     {
-        if (vHostName == null)
-        {
-            throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified");
-        }
+        _log.info("Loading Message (ID:" + messageId + ")");
 
-        if (location == null)
-        {
-            throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified.");
-        }
+        MessageMetaData mmd;
 
-        _flowToDiskLocation = location;
+        File handle = getFileHandle(messageId);
+        handle.deleteOnExit();
 
-        _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName;
+        ObjectInputStream input = null;
 
-        File root = new File(location);
-        if (!root.exists())
-        {
-            throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath());
-        }
-        else
+        Exception error = null;
+        try
         {
+            input = new ObjectInputStream(new FileInputStream(handle));
 
-            if (root.isFile())
-            {
-                throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:"+
-                           root.getAbsolutePath());
-            }
+            long arrivaltime = input.readLong();
+
+            final AMQShortString exchange = new AMQShortString(input.readUTF());
+            final AMQShortString routingKey = new AMQShortString(input.readUTF());
+            final boolean mandatory = input.readBoolean();
+            final boolean immediate = input.readBoolean();
+
+            int bodySize = input.readInt();
+            byte[] underlying = new byte[bodySize];
+
+            input.readFully(underlying, 0, bodySize);
 
-            if(!root.canWrite())
+            ByteBuffer buf = ByteBuffer.wrap(underlying);
+
+            ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize);
+
+            int chunkCount = input.readInt();
+
+            // There are WAY to many annonymous MPIs in the code this should be made concrete.
+            MessagePublishInfo info = new MessagePublishInfo()
             {
-                throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:"+
-                           root.getAbsolutePath());
-            }
 
-        }
+                public AMQShortString getExchange()
+                {
+                    return exchange;
+                }
+
+                public void setExchange(AMQShortString exchange)
+                {
+
+                }
+
+                public boolean isImmediate()
+                {
+                    return immediate;
+                }
+
+                public boolean isMandatory()
+                {
+                    return mandatory;
+                }
+
+                public AMQShortString getRoutingKey()
+                {
+                    return routingKey;
+                }
+            };
 
+            mmd = new MessageMetaData(info, chb, chunkCount);
+            mmd.setArrivalTime(arrivaltime);
 
-        File store = new File(_flowToDiskLocation);
-        if (store.exists())
-        {
-            if (!FileUtils.delete(store, true))
+            AMQMessage message;
+            if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2)
             {
-                throw new ConfigurationException("Unable to create Temporary Flow to Disk store as directory already exsits:"
-                           + store.getAbsolutePath());
+                message = new PersistentAMQMessage(messageId, null);
+            }
+            else
+            {
+                message = new TransientAMQMessage(messageId);
             }
 
-            if (store.isFile())
+            message.recoverFromMessageMetaData(mmd);
+
+            for (int chunk = 0; chunk < chunkCount; chunk++)
             {
-                throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:"+
-                           store.getAbsolutePath());
+                int length = input.readInt();
+
+                byte[] data = new byte[length];
+
+                input.readFully(data, 0, length);
+
+                try
+                {
+                    message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount));
+                }
+                catch (AMQException e)
+                {
+                    //ignore as this will not occur.
+                    // It is thrown by the _transactionLog method in load on PersistentAMQMessage
+                    // but we have created the message with a null log and will never call that method.
+                }
             }
 
+            return message;
         }
-        else
+        catch (Exception e)
         {
-            if (!store.getParentFile().getParentFile().canWrite())
-            {
-                throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to parent location:"+
-                           store.getParentFile().getParentFile().getAbsolutePath());
-            }
+            error = e;
         }
-
-
-        _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath());
-        store.deleteOnExit();
-        if (!store.mkdirs())
+        finally
         {
-            throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath());
+            try
+            {
+                input.close();
+                // We can purge the message here then reflow it if required but I believe it to be cleaner to leave it
+                // on disk until it has been deleted from the queue at that point we can be sure we won't need the data
+                //handle.delete();
+            }
+            catch (IOException e)
+            {
+                _log.info("Unable to close input on message(" + messageId + ") recovery due to:" + e.getMessage());
+            }
         }
-    }
-
-
-    public AMQMessage recover(Long messageId)
-     {
-         MessageMetaData mmd;
-         List<ContentChunk> contentBodies = new LinkedList<ContentChunk>();
-
-         File handle = getFileHandle(messageId);
-         handle.deleteOnExit();
-
-         ObjectInputStream input = null;
-
-         Exception error = null;
-         try
-         {
-             input = new ObjectInputStream(new FileInputStream(handle));
-
-             long arrivaltime = input.readLong();
-
-             final AMQShortString exchange = new AMQShortString(input.readUTF());
-             final AMQShortString routingKey = new AMQShortString(input.readUTF());
-             final boolean mandatory = input.readBoolean();
-             final boolean immediate = input.readBoolean();
-
-             int bodySize = input.readInt();
-             byte[] underlying = new byte[bodySize];
-
-             input.readFully(underlying, 0, bodySize);
-
-             ByteBuffer buf = ByteBuffer.wrap(underlying);
-
-             ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize);
-
-             int chunkCount = input.readInt();
-
-             // There are WAY to many annonymous MPIs in the code this should be made concrete.
-             MessagePublishInfo info = new MessagePublishInfo()
-             {
-
-                 public AMQShortString getExchange()
-                 {
-                     return exchange;
-                 }
-
-                 public void setExchange(AMQShortString exchange)
-                 {
-
-                 }
-
-                 public boolean isImmediate()
-                 {
-                     return immediate;
-                 }
-
-                 public boolean isMandatory()
-                 {
-                     return mandatory;
-                 }
-
-                 public AMQShortString getRoutingKey()
-                 {
-                     return routingKey;
-                 }
-             };
-
-             mmd = new MessageMetaData(info, chb, chunkCount);
-             mmd.setArrivalTime(arrivaltime);
-
-             AMQMessage message;
-             if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2)
-             {
-                 message = new PersistentAMQMessage(messageId, null);
-             }
-             else
-             {
-                 message = new TransientAMQMessage(messageId);
-             }
-
-             message.recoverFromMessageMetaData(mmd);
-
-             for (int chunk = 0; chunk < chunkCount; chunk++)
-             {
-                 int length = input.readInt();
-
-                 byte[] data = new byte[length];
-
-                 input.readFully(data, 0, length);
-
-                 // There are WAY to many annonymous CCs in the code this should be made concrete.
-                 try
-                 {
-                     message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount));
-                 }
-                 catch (AMQException e)
-                 {
-                     //ignore as this will not occur.
-                     // It is thrown by the _transactionLog method in recover on PersistentAMQMessage
-                     // but we have created the message with a null log and will never call that method.
-                 }
-             }
-
-             return message;
-         }
-         catch (Exception e)
-         {
-             error = e;
-         }
-         finally
-         {
-             try
-             {
-                 input.close();
-             }
-             catch (IOException e)
-             {
-                 _log.info("Unable to close input on message("+messageId+") recovery due to:"+e.getMessage());
-             }
-         }
 
         throw new UnableToRecoverMessageException(error);
     }
 
-
-    public void flow(AMQMessage message) throws UnableToFlowMessageException
+    public void unload(AMQMessage message) throws UnableToFlowMessageException
     {
         long messageId = message.getMessageId();
 
@@ -264,10 +180,18 @@
         //If we have written the data once then we don't need to do it again.
         if (handle.exists())
         {
-            _log.debug("Message(" + messageId + ") already flowed to disk.");
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Message(ID:" + messageId + ") already unloaded.");
+            }
             return;
         }
 
+        if (_log.isInfoEnabled())
+        {
+            _log.info("Unloading Message (ID:" + messageId + ")");
+        }
+
         handle.deleteOnExit();
 
         ObjectOutputStream writer = null;
@@ -334,7 +258,7 @@
 
         if (error != null)
         {
-            _log.error("Unable to flow message(" + messageId + ") to disk, restoring state.");
+            _log.error("Unable to unload message(" + messageId + ") to disk, restoring state.");
             handle.delete();
             throw new UnableToFlowMessageException(messageId, error);
         }
@@ -358,7 +282,7 @@
         // grab the 8 LSB to give us 256 bins
         long bin = messageId & 0xFFL;
 
-        String bin_path =_flowToDiskLocation + File.separator + bin;
+        String bin_path = _flowToDiskLocation + File.separator + bin;
         File bin_dir = new File(bin_path);
 
         if (!bin_dir.exists())
@@ -379,7 +303,10 @@
 
         if (handle.exists())
         {
-            _log.debug("Message(" + messageId + ") delete flowToDisk.");
+            if (_log.isInfoEnabled())
+            {
+                _log.info("Message(" + messageId + ") delete flowToDisk.");
+            }
             if (!handle.delete())
             {
                 throw new RuntimeException("Unable to delete flowToDisk data");

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java?rev=748519&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java Fri Feb 27 13:37:03 2009
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.util.FileUtils;
+
+import java.io.File;
+
+public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory
+{
+    private static final Logger _log = Logger.getLogger(FileQueueBackingStoreFactory.class);
+
+    private String _flowToDiskLocation;
+    private static final String QUEUE_BACKING_DIR = "queueBacking";
+
+    public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException
+    {
+        setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation());
+    }
+
+    private void setFlowToDisk(String vHostName, String location) throws ConfigurationException
+    {
+        if (vHostName == null)
+        {
+            throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified");
+        }
+
+        if (location == null)
+        {
+            throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified.");
+        }
+
+        _flowToDiskLocation = location;
+
+        _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName;
+
+        File root = new File(location);
+        if (!root.exists())
+        {
+            throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath());
+        }
+        else
+        {
+
+            if (root.isFile())
+            {
+                throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:" +
+                                                 root.getAbsolutePath());
+            }
+
+            if (!root.canWrite())
+            {
+                throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:" +
+                                                 root.getAbsolutePath());
+            }
+
+        }
+        
+        // if we don't mark QUEUE_BAKCING_DIR as a deleteOnExit it will remain.        
+        File backingDir = new File(location + File.separator + QUEUE_BACKING_DIR);
+        if (backingDir.exists())
+        {
+            if (!FileUtils.delete(backingDir, true))
+            {
+                throw new ConfigurationException("Unable to delete existing Flow to Disk root at:"
+                                                 + backingDir.getAbsolutePath());
+            }
+
+            if (backingDir.isFile())
+            {
+                throw new ConfigurationException("Unable to create Temporary Flow to Disk root as specified location is a file:" +
+                                                 backingDir.getAbsolutePath());
+            }
+        }
+        
+        backingDir.deleteOnExit();
+        if (!backingDir.mkdirs())
+        {
+            throw new ConfigurationException("Unable to create Temporary Flow to Disk root:" + location + File.separator + QUEUE_BACKING_DIR);
+        }
+
+
+        File store = new File(_flowToDiskLocation);
+        if (store.exists())
+        {
+            if (!FileUtils.delete(store, true))
+            {
+                throw new ConfigurationException("Unable to delete existing Flow to Disk store at:"
+                                                 + store.getAbsolutePath());
+            }
+
+            if (store.isFile())
+            {
+                throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:" +
+                                                 store.getAbsolutePath());
+            }
+
+        }
+
+        _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath());
+        store.deleteOnExit();
+
+        if(!store.mkdir())
+        {
+            throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath());
+        }
+    }
+
+    public QueueBackingStore createBacking(AMQQueue queue)
+    {
+        return new FileQueueBackingStore(createStore(queue.getName().toString()));
+    }
+
+    private String createStore(String name)
+    {
+        return createStore(name, 0);
+    }
+
+    private String createStore(String name, int index)
+    {
+
+        String store = _flowToDiskLocation + File.separator + name;
+        if (index > 0)
+        {
+            store += "-" + index;
+        }
+
+        //TODO ensure store is safe for the OS
+
+        File storeFile = new File(store);
+
+        if (storeFile.exists())
+        {
+            return createStore(name, index + 1);
+        }
+
+        storeFile.mkdirs();
+
+        storeFile.deleteOnExit();
+
+        return store;
+    }
+
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java Fri Feb 27 13:37:03 2009
@@ -20,16 +20,18 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.log4j.Logger;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
-/**
- * This is an abstract base class to handle
- */
+/** This is an abstract base class to handle */
 public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList
 {
     private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
@@ -43,9 +45,12 @@
 
     /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
     private long _memoryUsageMinimum = 0;
-    private AtomicBoolean _flowed;
+    private volatile AtomicBoolean _flowed;
     private QueueBackingStore _backingStore;
     protected AMQQueue _queue;
+    private Executor _inhaler;
+    private AtomicBoolean _stopped;
+    private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null);
 
     FlowableBaseQueueEntryList(AMQQueue queue)
     {
@@ -54,15 +59,18 @@
         VirtualHost vhost = queue.getVirtualHost();
         if (vhost != null)
         {
-            _backingStore = vhost.getQueueBackingStore();
+            _backingStore = vhost.getQueueBackingStoreFactory().createBacking(queue);
         }
+
+        _stopped = new AtomicBoolean(false);
+        _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
     }
 
     public void setFlowed(boolean flowed)
     {
         if (_flowed.get() != flowed)
         {
-            _log.info("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
+            _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
             _flowed.set(flowed);
         }
     }
@@ -94,14 +102,15 @@
         // Don't attempt to start the inhaler/purger unless we have a minimum value specified.
         if (_memoryUsageMaximum > 0)
         {
-            // If we've increased the max memory above what we have in memory then we can inhale more
-            if (_memoryUsageMaximum > _atomicQueueInMemory.get())
+            if (_memoryUsageMinimum == 0)
             {
-                //TODO start inhaler
+                setMemoryUsageMinimum(_memoryUsageMaximum / 2);
             }
-            else // if we have now have to much memory in use we need to purge.
+
+            // if we have now have to much memory in use we need to purge.
+            if (_memoryUsageMaximum < _atomicQueueInMemory.get())
             {
-                //TODO start purger
+                startPurger();
             }
         }
     }
@@ -118,19 +127,78 @@
         // Don't attempt to start the inhaler unless we have a minimum value specified.
         if (_memoryUsageMinimum > 0)
         {
-            // If we've increased the minimum memory above what we have in memory then we need to inhale more
-            if (_memoryUsageMinimum >= _atomicQueueInMemory.get())
+            checkAndStartLoader();
+        }
+    }
+
+    private void checkAndStartLoader()
+    {
+        // If we've increased the minimum memory above what we have in memory then we need to inhale more
+        if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
+        {
+            startInhaler();
+        }
+    }
+
+    private void startInhaler()
+    {
+        if (_flowed.get())
+        {
+            MessageInhaler inhaler = new MessageInhaler();
+
+            if (_asynchronousInhaler.compareAndSet(null, inhaler))
             {
-                //TODO start inhaler
+                _inhaler.execute(inhaler);
             }
         }
     }
 
+    private void startPurger()
+    {
+       //TODO create purger, used when maxMemory is reduced creating over memory situation.
+       _log.warn("Requested Purger Start.. purger TBC.");
+       //_purger.execute(new MessagePurger(this));
+    }
+
     public long getMemoryUsageMinimum()
     {
         return _memoryUsageMinimum;
     }
 
+    public void unloadEntry(QueueEntry queueEntry)
+    {
+        try
+        {
+            queueEntry.unload();
+            _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
+            checkAndStartLoader();
+        }
+        catch (UnableToFlowMessageException e)
+        {
+            _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+        }
+    }
+
+    public void loadEntry(QueueEntry queueEntry)
+    {
+        queueEntry.load();
+        if( _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+        {
+            _log.error("Loaded to much data!:"+_atomicQueueInMemory.get()+"/"+_memoryUsageMaximum);
+        }
+    }
+
+    public void stop()
+    {
+        if (!_stopped.getAndSet(true))
+        {
+            // The SimpleAMQQueue keeps running when stopped so we should just release the services
+            // rather than actively shutdown our threads.
+            //Shutdown thread for inhaler.
+            ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+        }
+    }
+
     protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry)
     {
         return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum;
@@ -153,13 +221,14 @@
 
     /**
      * Called when we are now flowing to disk
+     *
      * @param queueEntry the entry that is being flowed to disk
      */
     protected void flowingToDisk(QueueEntryImpl queueEntry)
     {
         try
         {
-            queueEntry.flow();
+            queueEntry.unload();
         }
         catch (UnableToFlowMessageException e)
         {
@@ -182,4 +251,71 @@
         return _backingStore;
     }
 
+    private class MessageInhaler implements Runnable
+    {
+        public void run()
+        {
+            String threadName = Thread.currentThread().getName();
+            Thread.currentThread().setName("Inhaler-" + _queue.getVirtualHost().getName() + "-" + _queue.getName());
+            try
+            {
+                inhaleList(this);
+            }
+            finally
+            {
+                Thread.currentThread().setName(threadName);
+            }
+        }
+    }
+
+    private void inhaleList(MessageInhaler messageInhaler)
+    {
+        _log.info("Inhaler Running");
+        // If in memory count is at or over max then we can't inhale
+        if (_atomicQueueInMemory.get() >= _memoryUsageMaximum)
+        {
+            _log.debug("Unable to start inhaling as we are already over quota:" +
+                       _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
+            return;
+        }
+
+        _asynchronousInhaler.compareAndSet(messageInhaler, null);
+        while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && _asynchronousInhaler.compareAndSet(null, messageInhaler))
+        {
+            QueueEntryIterator iterator = iterator();
+
+            while (!iterator.getNode().isAvailable() && iterator.advance())
+            {
+                //Find first AVAILABLE node
+            }
+
+            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && !iterator.atTail())
+            {
+                QueueEntry entry = iterator.getNode();
+
+                if (entry.isAvailable() && entry.isFlowed())
+                {
+                    loadEntry(entry);
+                }
+
+                iterator.advance();
+            }
+
+            if (iterator.atTail())
+            {
+                setFlowed(false);
+            }
+
+            _asynchronousInhaler.set(null);
+        }
+
+        //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
+        if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+        {
+            _inhaler.execute(messageInhaler);
+
+        }
+
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java Fri Feb 27 13:37:03 2009
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.server.queue;
 
-public interface FlowableQueueEntryList
+import java.util.concurrent.atomic.AtomicLong;
+
+public interface FlowableQueueEntryList extends QueueEntryList
 {
     void setFlowed(boolean flowed);
 
@@ -38,5 +40,19 @@
 
     void setMemoryUsageMinimum(long minimumMemoryUsage);
 
-    long getMemoryUsageMinimum();    
+    long getMemoryUsageMinimum();
+
+    /**
+     * Immediately unload Entry
+     * @param queueEntry the entry to unload
+     */
+    public void unloadEntry(QueueEntry queueEntry);
+
+    /**
+     * Immediately load Entry
+     * @param queueEntry the entry to load
+     */
+    public void loadEntry(QueueEntry queueEntry);
+
+    void stop();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java Fri Feb 27 13:37:03 2009
@@ -22,10 +22,10 @@
 
 import org.apache.qpid.framing.CommonContentHeaderProperties;
 
-public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements  QueueEntryList
+public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
 {
     private final AMQQueue _queue;
-    private final QueueEntryList[] _priorityLists;
+    private final FlowableQueueEntryList[] _priorityLists;
     private final int _priorities;
     private final int _priorityOffset;
 
@@ -33,7 +33,7 @@
     {
         super(queue);
         _queue = queue;
-        _priorityLists = new QueueEntryList[priorities];
+        _priorityLists = new FlowableQueueEntryList[priorities];
         _priorities = priorities;
         _priorityOffset = 5-((priorities + 1)/2);
         for(int i = 0; i < priorities; i++)
@@ -53,7 +53,7 @@
     }
 
     public QueueEntry add(AMQMessage message)
-    {        
+    {
         int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
         if(index >= _priorities)
         {
@@ -152,7 +152,7 @@
             _priorities = priorities;
         }
 
-        public QueueEntryList createQueueEntryList(AMQQueue queue)
+        public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
         {
             return new PriorityQueueEntryList(queue, _priorities);
         }
@@ -162,7 +162,7 @@
     public int size()
     {
         int size=0;
-        for (QueueEntryList queueEntryList : _priorityLists)
+        for (FlowableQueueEntryList queueEntryList : _priorityLists)
         {
             size += queueEntryList.size();
         }
@@ -174,9 +174,6 @@
     @Override
     protected void flowingToDisk(QueueEntryImpl queueEntry)
     {
-        //TODO this disables FTD for priority queues
-        // As the incomming message isn't always the one to purge.
-        // More logic is required up in the add() method here to determine if the
-        // incomming message is at the 'front' or not.
+        //This class doesn't maintain it's own sizes it delegates to the sub FlowableQueueEntryLists
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java Fri Feb 27 13:37:03 2009
@@ -26,11 +26,9 @@
 
 public interface QueueBackingStore
 {
-    void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException;
+    AMQMessage load(Long messageId);
 
-    AMQMessage recover(Long messageId);
-
-    void flow(AMQMessage message) throws UnableToFlowMessageException;
+    void unload(AMQMessage message) throws UnableToFlowMessageException;
 
     void delete(Long messageId);
 }

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java (from r748516, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java&r1=748516&r2=748519&rev=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java Fri Feb 27 13:37:03 2009
@@ -20,17 +20,13 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.commons.configuration.ConfigurationException;
 
-public interface QueueBackingStore
+public interface QueueBackingStoreFactory
 {
     void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException;
 
-    AMQMessage recover(Long messageId);
-
-    void flow(AMQMessage message) throws UnableToFlowMessageException;
-
-    void delete(Long messageId);
+    public QueueBackingStore createBacking(AMQQueue queue);
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Feb 27 13:37:03 2009
@@ -157,6 +157,8 @@
 
     boolean isAcquired();
 
+    boolean isAvailable();
+
     boolean acquire();
 
     boolean acquire(Subscription sub);
@@ -219,9 +221,9 @@
 
     boolean removeStateChangeListener(StateChangeListener listener);
 
-    void flow() throws UnableToFlowMessageException;
+    void unload() throws UnableToFlowMessageException;
 
-    void recover();
+    void load();
 
     boolean isFlowed();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Feb 27 13:37:03 2009
@@ -181,6 +181,11 @@
         return _state.getState() == State.ACQUIRED;
     }
 
+    public boolean isAvailable()
+    {
+        return _state.getState() == State.AVAILABLE;
+    }
+
     public boolean acquire()
     {
         return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
@@ -220,7 +225,15 @@
 
     public String debugIdentity()
     {
-        return getMessage().debugIdentity();
+        String entry="[State:"+_state.getState().name()+"]";
+        if (_message == null)
+        {
+            return entry+"(Message Unloaded ID:" + _messageId +")";
+        }
+        else
+        {
+            return entry+_message.debugIdentity();
+        }
     }
 
 
@@ -380,25 +393,29 @@
         return false;
     }
 
-    public void flow() throws UnableToFlowMessageException
+    public void unload() throws UnableToFlowMessageException
     {
         if (_message != null && _backingStore != null)
         {
             if(_log.isDebugEnabled())
             {
-                _log.debug("Flowing message:" + _message.debugIdentity());
+                _log.debug("Unloading:" + debugIdentity());
             }
-            _backingStore.flow(_message);
+            _backingStore.unload(_message);
             _message = null;
-            _flowed.getAndSet(true);            
+            _flowed.getAndSet(true);
         }
     }
 
-    public void recover()
+    public void load()
     {
         if (_messageId != null && _backingStore != null)
         {
-            _message = _backingStore.recover(_messageId);
+            _message = _backingStore.load(_messageId);
+            if(_log.isDebugEnabled())
+            {
+                _log.debug("Loading:" + debugIdentity());
+            }
             _flowed.getAndSet(false);
         }
     }
@@ -471,5 +488,4 @@
         return _queueEntryList;
     }
 
-
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Fri Feb 27 13:37:03 2009
@@ -20,7 +20,7 @@
 */
 package org.apache.qpid.server.queue;
 
-public interface QueueEntryList extends FlowableQueueEntryList
+public interface QueueEntryList
 {
     AMQQueue getQueue();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java Fri Feb 27 13:37:03 2009
@@ -22,5 +22,5 @@
 
 interface QueueEntryListFactory
 {
-    public QueueEntryList createQueueEntryList(AMQQueue queue);
+    public FlowableQueueEntryList createQueueEntryList(AMQQueue queue);
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Feb 27 13:37:03 2009
@@ -79,7 +79,7 @@
 
     private volatile Subscription _exclusiveSubscriber;
 
-    protected final QueueEntryList _entries;
+    protected final FlowableQueueEntryList _entries;
 
     private final AMQQueueMBean _managedObject;
     private final Executor _asyncDelivery;
@@ -465,8 +465,21 @@
             throws AMQException
     {
         _deliveredMessages.incrementAndGet();
+
+        if (entry.isFlowed())
+        {
+            _logger.debug("Synchronously loading flowed entry:" + entry.debugIdentity());
+            _entries.loadEntry(entry);
+        }
+
         sub.send(entry);
 
+        // We have delivered this message so we can unload it.
+        if (_entries.isFlowed() && entry.isAcquired() && entry.getDeliveredToConsumer())
+        {
+            _entries.unloadEntry(entry);
+        }
+
     }
 
     private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
@@ -1101,6 +1114,7 @@
         if (!_stopped.getAndSet(true))
         {
             ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+            _entries.stop();
         }
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Fri Feb 27 13:37:03 2009
@@ -25,7 +25,7 @@
 * under the License.
 *
 */
-public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
+public class SimpleQueueEntryList extends FlowableBaseQueueEntryList
 {
 
     private final QueueEntryImpl _head;
@@ -172,7 +172,7 @@
     static class Factory implements QueueEntryListFactory
     {
 
-        public QueueEntryList createQueueEntryList(AMQQueue queue)
+        public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
         {
             return new SimpleQueueEntryList(queue);
         }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Feb 27 13:37:03 2009
@@ -49,7 +49,8 @@
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.QueueBackingStore;
-import org.apache.qpid.server.queue.FileQueueBackingStore;
+import org.apache.qpid.server.queue.FileQueueBackingStoreFactory;
+import org.apache.qpid.server.queue.QueueBackingStoreFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.routing.RoutingTable;
 import org.apache.qpid.server.security.access.ACLManager;
@@ -88,7 +89,7 @@
     private final Timer _houseKeepingTimer;
     
     private VirtualHostConfiguration _configuration;
-    private QueueBackingStore _queueBackingStore;
+    private QueueBackingStoreFactory _queueBackingStoreFactory;
 
     public void setAccessableName(String name)
     {
@@ -116,9 +117,9 @@
         return _configuration ;
     }
 
-    public QueueBackingStore getQueueBackingStore()
+    public QueueBackingStoreFactory getQueueBackingStoreFactory()
     {
-        return _queueBackingStore;
+        return _queueBackingStoreFactory;
     }
 
     /**
@@ -194,8 +195,8 @@
             initialiseRoutingTable(hostConfig);
         }
 
-        _queueBackingStore = new FileQueueBackingStore();
-        _queueBackingStore.configure(this,hostConfig);
+        _queueBackingStoreFactory = new FileQueueBackingStoreFactory();
+        _queueBackingStoreFactory.configure(this, hostConfig);
 
         _exchangeFactory.initialise(hostConfig);
         _exchangeRegistry.initialise();

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Fri Feb 27 13:37:03 2009
@@ -240,6 +240,11 @@
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
+                public boolean isAvailable()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
                 public boolean acquire()
                 {
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
@@ -346,12 +351,12 @@
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void flow() throws UnableToFlowMessageException
+                public void unload() throws UnableToFlowMessageException
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void recover()
+                public void load()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Fri Feb 27 13:37:03 2009
@@ -23,7 +23,6 @@
 import junit.framework.AssertionFailedError;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 
 import java.util.ArrayList;
@@ -64,7 +63,7 @@
         _queue.registerSubscription(_subscription, false);
         Thread.sleep(150);
 
-        ArrayList<QueueEntry> msgs = _subscription.getMessages();
+        ArrayList<QueueEntry> msgs = _subscription.getQueueEntries();
         try
         {
             assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId());

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java Fri Feb 27 13:37:03 2009
@@ -21,9 +21,9 @@
 package org.apache.qpid.server.queue;
 
 import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
@@ -42,22 +42,26 @@
 
 public class FileQueueBackingStoreTest extends TestCase
 {
-    FileQueueBackingStore _backing;
+    QueueBackingStore _backing;
     private TransactionLog _transactionLog;
     VirtualHost _vhost;
-        VirtualHostConfiguration _vhostConfig;
+    VirtualHostConfiguration _vhostConfig;
+    FileQueueBackingStoreFactory _factory;
+    AMQQueue _queue;
 
     public void setUp() throws Exception
     {
-        _backing = new FileQueueBackingStore();
+        _factory = new FileQueueBackingStoreFactory();
         PropertiesConfiguration config = new PropertiesConfiguration();
         config.addProperty("store.class", MemoryMessageStore.class.getName());
         _vhostConfig = new VirtualHostConfiguration(this.getName() + "-Vhost", config);
         _vhost = new VirtualHost(_vhostConfig);
         _transactionLog = _vhost.getTransactionLog();
 
-        _backing.configure(_vhost, _vhost.getConfiguration());
+        _factory.configure(_vhost, _vhost.getConfiguration());
 
+        _queue = new SimpleAMQQueue(new AMQShortString(this.getName()), false, null, false, _vhost);
+        _backing = _factory.createBacking(_queue);
     }
 
     private void resetBacking(Configuration configuration) throws Exception
@@ -67,9 +71,11 @@
         _vhost = new VirtualHost(_vhostConfig);
         _transactionLog = _vhost.getTransactionLog();
 
-        _backing = new FileQueueBackingStore();
+        _factory = new FileQueueBackingStoreFactory();
+
+        _factory.configure(_vhost, _vhost.getConfiguration());
 
-        _backing.configure(_vhost, _vhost.getConfiguration());
+        _backing = _factory.createBacking(_queue);
     }
 
     public void testInvalidSetupRootExistsIsFile() throws Exception
@@ -171,18 +177,18 @@
                                                 chb);
         if (chb.bodySize > 0)
         {
-            ContentChunk chunk = new MockContentChunk((int) chb.bodySize/2);
+            ContentChunk chunk = new MockContentChunk((int) chb.bodySize / 2);
 
             original.addContentBodyFrame(null, chunk, false);
 
-            chunk = new MockContentChunk((int) chb.bodySize/2);
+            chunk = new MockContentChunk((int) chb.bodySize / 2);
 
-            original.addContentBodyFrame(null, chunk, true);            
+            original.addContentBodyFrame(null, chunk, true);
         }
 
-        _backing.flow(original);
+        _backing.unload(original);
 
-        AMQMessage fromDisk = _backing.recover(original.getMessageId());
+        AMQMessage fromDisk = _backing.load(original.getMessageId());
 
         assertEquals("Message IDs do not match", original.getMessageId(), fromDisk.getMessageId());
         assertEquals("Message arrival times do not match", original.getArrivalTime(), fromDisk.getArrivalTime());

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Feb 27 13:37:03 2009
@@ -20,6 +20,7 @@
  * 
  */
 
+import junit.framework.AssertionFailedError;
 import junit.framework.TestCase;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.qpid.AMQException;
@@ -29,6 +30,7 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -58,7 +60,7 @@
     protected FieldTable _arguments = null;
 
     MessagePublishInfo info = new MessagePublishInfoImpl();
-    private static final long MESSAGE_SIZE = 100;
+    private static long MESSAGE_SIZE = 100;
 
     @Override
     protected void setUp() throws Exception
@@ -68,7 +70,7 @@
         ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
 
         PropertiesConfiguration env = new PropertiesConfiguration();
-        _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _transactionLog);
+        _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
         applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
 
         _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -362,56 +364,69 @@
         // Create IncomingMessage and nondurable queue
         NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
 
+        MESSAGE_SIZE = 1;
+        long MEMORY_MAX = 500;
+        int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
         //Set the Memory Usage to be very low
-        _queue.setMemoryUsageMaximum(10);
+        _queue.setMemoryUsageMaximum(MEMORY_MAX);        
 
-        for (int msgCount = 0; msgCount < 10; msgCount++)
+        for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
         {
             sendMessage(txnContext);
         }
 
         //Check that we can hold 10 messages without flowing
-        assertEquals(10, _queue.getMessageCount());
-        assertEquals(10, _queue.getMemoryUsageCurrent());
+        assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+        assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
         assertTrue("Queue is flowed.", !_queue.isFlowed());
 
         // Send anothe and ensure we are flowed
         sendMessage(txnContext);
-        assertEquals(11, _queue.getMessageCount());
-        assertEquals(10, _queue.getMemoryUsageCurrent());
+        assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+        assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
         assertTrue("Queue is not flowed.", _queue.isFlowed());
 
-        //send another 9 so there are 20msgs in total on the queue
-        for (int msgCount = 0; msgCount < 9; msgCount++)
+        //send another 99 so there are 200msgs in total on the queue
+        for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
         {
             sendMessage(txnContext);
+
+            long usage = _queue.getMemoryUsageCurrent();
+            assertTrue("Queue has gone over quota:" + usage,
+                       usage <= _queue.getMemoryUsageMaximum());
+
+            assertTrue("Queue has a negative quota:" + usage,usage  > 0);
+
         }
-        assertEquals(20, _queue.getMessageCount());
-        assertEquals(10, _queue.getMemoryUsageCurrent());
+        assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+        assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
         assertTrue("Queue is not flowed.", _queue.isFlowed());
 
         _queue.registerSubscription(_subscription, false);
 
-        Thread.sleep(200);
+        int slept = 0;
+        while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+        {
+            Thread.sleep(500);
+            slept++;
+        }
 
         //Ensure the messages are retreived
-        assertEquals("Not all messages were received.", 20, _subscription.getMessages().size());
+        assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
 
-        //Ensure we got the content
-        for (int index = 0; index < 10; index++)
-        {
-            QueueEntry entry = _subscription.getMessages().get(index);            
-            assertNotNull("Message:" + index + " was null.", entry.getMessage());
-            assertTrue(!entry.isFlowed());
-        }
+        //Check the queue is still within it's limits.
+        assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
+                   _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
 
-        //ensure we were received 10 flowed messages
-        for (int index = 10; index < 20; index++)
+        assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0);
+
+        for (int index = 0; index < MESSAGE_COUNT; index++)
         {
-            QueueEntry entry = _subscription.getMessages().get(index);
-            assertNull("Message:" + index + " was not null.", entry.getMessage());
-            assertTrue(entry.isFlowed());
+            // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+            AMQMessage message = _subscription.getMessages().get(index);
+            assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
         }
+
     }
 
     private void sendMessage(TransactionalContext txnContext) throws AMQException
@@ -419,7 +434,8 @@
         IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
-        contentHeaderBody.bodySize = 1;
+        contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+        contentHeaderBody.bodySize = MESSAGE_SIZE;
         contentHeaderBody.properties = new BasicContentHeaderProperties();
         ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
         msg.setContentHeaderBody(contentHeaderBody);

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java Fri Feb 27 13:37:03 2009
@@ -46,7 +46,10 @@
 
             assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
 
-            assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
+            //This is +2 because:
+            // 1 - asyncDelivery Thread
+            // 2 - queueInhalerThread
+            assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount());
             
             queue.stop();
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=748519&r1=748518&r2=748519&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Fri Feb 27 13:37:03 2009
@@ -30,10 +30,13 @@
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
+import org.apache.log4j.Logger;
 
 public class MockSubscription implements Subscription
 {
+    private static final Logger _logger = Logger.getLogger(MockSubscription.class);
 
     private boolean _closed = false;
     private AMQShortString tag = new AMQShortString("mocktag");
@@ -41,8 +44,12 @@
     private StateListener _listener = null;
     private QueueEntry lastSeen = null;
     private State _state = State.ACTIVE;
-    private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
+    private ArrayList<QueueEntry> _queueEntries = new ArrayList<QueueEntry>();
     private final Lock _stateChangeLock = new ReentrantLock();
+    private ArrayList<AMQMessage> _messages = new ArrayList<AMQMessage>();
+
+
+
 
     public void close()
     {
@@ -136,10 +143,14 @@
     {
     }
 
-    public void send(QueueEntry msg) throws AMQException
+    public void send(QueueEntry entry) throws AMQException
     {
-        lastSeen = msg;
-        messages.add(msg);
+        _logger.info("Sending Message(" + entry.debugIdentity() + ")  to subscription:" + this);
+
+        lastSeen = entry;
+        _queueEntries.add(entry);
+        _messages.add(entry.getMessage());
+        entry.setDeliveredToSubscription();        
     }
 
     public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
@@ -173,8 +184,14 @@
         return false;
     }
 
-    public ArrayList<QueueEntry> getMessages()
+    public ArrayList<QueueEntry> getQueueEntries()
     {
-        return messages;
+        return _queueEntries;
     }
+
+    public ArrayList<AMQMessage> getMessages()
+    {
+        return _messages;
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org